You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:19 UTC

[11/55] [abbrv] beam git commit: Refactor classes into packages

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
new file mode 100644
index 0000000..6512cc1
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query4.
+ */
+public class CategoryPrice implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+  public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() {
+    @Override
+    public void encode(CategoryPrice value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.category, outStream, Context.NESTED);
+      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+      INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED);
+    }
+
+    @Override
+    public CategoryPrice decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long category = LONG_CODER.decode(inStream, Context.NESTED);
+      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      boolean isLast = INT_CODER.decode(inStream, context) != 0;
+      return new CategoryPrice(category, price, isLast);
+    }
+  };
+
+  @JsonProperty
+  public final long category;
+
+  /** Price in cents. */
+  @JsonProperty
+  public final long price;
+
+  @JsonProperty
+  public final boolean isLast;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private CategoryPrice() {
+    category = 0;
+    price = 0;
+    isLast = false;
+  }
+
+  public CategoryPrice(long category, long price, boolean isLast) {
+    this.category = category;
+    this.price = price;
+    this.isLast = isLast;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8 + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
new file mode 100644
index 0000000..6009463
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * Result of query 10.
+ */
+public class Done implements KnownSize, Serializable {
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<Done> CODER = new AtomicCoder<Done>() {
+    @Override
+    public void encode(Done value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      STRING_CODER.encode(value.message, outStream, Context.NESTED);
+    }
+
+    @Override
+    public Done decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      String message = STRING_CODER.decode(inStream, Context.NESTED);
+      return new Done(message);
+    }
+  };
+
+  @JsonProperty
+  public final String message;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  public Done() {
+    message = null;
+  }
+
+  public Done(String message) {
+    this.message = message;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return message.length();
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
new file mode 100644
index 0000000..8a278bf
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+
+/**
+ * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction},
+ * or a {@link Bid}.
+ */
+public class Event implements KnownSize, Serializable {
+  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+  public static final Coder<Event> CODER = new AtomicCoder<Event>() {
+    @Override
+    public void encode(Event value, OutputStream outStream, Coder.Context context)
+        throws CoderException, IOException {
+      if (value.newPerson != null) {
+        INT_CODER.encode(0, outStream, Context.NESTED);
+        Person.CODER.encode(value.newPerson, outStream, Context.NESTED);
+      } else if (value.newAuction != null) {
+        INT_CODER.encode(1, outStream, Context.NESTED);
+        Auction.CODER.encode(value.newAuction, outStream, Context.NESTED);
+      } else if (value.bid != null) {
+        INT_CODER.encode(2, outStream, Context.NESTED);
+        Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+      } else {
+        throw new RuntimeException("invalid event");
+      }
+    }
+
+    @Override
+    public Event decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      int tag = INT_CODER.decode(inStream, context);
+      if (tag == 0) {
+        Person person = Person.CODER.decode(inStream, Context.NESTED);
+        return new Event(person);
+      } else if (tag == 1) {
+        Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+        return new Event(auction);
+      } else if (tag == 2) {
+        Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+        return new Event(bid);
+      } else {
+        throw new RuntimeException("invalid event encoding");
+      }
+    }
+  };
+
+  @Nullable
+  @org.apache.avro.reflect.Nullable
+  public final Person newPerson;
+
+  @Nullable
+  @org.apache.avro.reflect.Nullable
+  public final Auction newAuction;
+
+  @Nullable
+  @org.apache.avro.reflect.Nullable
+  public final Bid bid;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Event() {
+    newPerson = null;
+    newAuction = null;
+    bid = null;
+  }
+
+  public Event(Person newPerson) {
+    this.newPerson = newPerson;
+    newAuction = null;
+    bid = null;
+  }
+
+  public Event(Auction newAuction) {
+    newPerson = null;
+    this.newAuction = newAuction;
+    bid = null;
+  }
+
+  public Event(Bid bid) {
+    newPerson = null;
+    newAuction = null;
+    this.bid = bid;
+  }
+
+  /**
+   * Return a copy of event which captures {@code annotation}.
+   * (Used for debugging).
+   */
+  public Event withAnnotation(String annotation) {
+    if (newPerson != null) {
+      return new Event(newPerson.withAnnotation(annotation));
+    } else if (newAuction != null) {
+      return new Event(newAuction.withAnnotation(annotation));
+    } else {
+      return new Event(bid.withAnnotation(annotation));
+    }
+  }
+
+  /**
+   * Does event have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    if (newPerson != null) {
+      return newPerson.hasAnnotation(annotation);
+    } else if (newAuction != null) {
+      return newAuction.hasAnnotation(annotation);
+    } else {
+      return bid.hasAnnotation(annotation);
+    }
+  }
+
+  /**
+   * Remove {@code annotation} from event. (Used for debugging.)
+   */
+  public Event withoutAnnotation(String annotation) {
+    if (newPerson != null) {
+      return new Event(newPerson.withoutAnnotation(annotation));
+    } else if (newAuction != null) {
+      return new Event(newAuction.withoutAnnotation(annotation));
+    } else {
+      return new Event(bid.withoutAnnotation(annotation));
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    if (newPerson != null) {
+      return 1 + newPerson.sizeInBytes();
+    } else if (newAuction != null) {
+      return 1 + newAuction.sizeInBytes();
+    } else if (bid != null) {
+      return 1 + bid.sizeInBytes();
+    } else {
+      throw new RuntimeException("invalid event");
+    }
+  }
+
+  @Override
+  public String toString() {
+    if (newPerson != null) {
+      return newPerson.toString();
+    } else if (newAuction != null) {
+      return newAuction.toString();
+    } else if (bid != null) {
+      return bid.toString();
+    } else {
+      throw new RuntimeException("invalid event");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
new file mode 100644
index 0000000..5d22651
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result type of Query8.
+ */
+public class IdNameReserve implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<IdNameReserve> CODER = new AtomicCoder<IdNameReserve>() {
+    @Override
+    public void encode(IdNameReserve value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.id, outStream, Context.NESTED);
+      STRING_CODER.encode(value.name, outStream, Context.NESTED);
+      LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+    }
+
+    @Override
+    public IdNameReserve decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long id = LONG_CODER.decode(inStream, Context.NESTED);
+      String name = STRING_CODER.decode(inStream, Context.NESTED);
+      long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+      return new IdNameReserve(id, name, reserve);
+    }
+  };
+
+  @JsonProperty
+  public final long id;
+
+  @JsonProperty
+  public final String name;
+
+  /** Reserve price in cents. */
+  @JsonProperty
+  public final long reserve;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private IdNameReserve() {
+    id = 0;
+    name = null;
+    reserve = 0;
+  }
+
+  public IdNameReserve(long id, String name, long reserve) {
+    this.id = id;
+    this.name = name;
+    this.reserve = reserve;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + name.length() + 1 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java
new file mode 100644
index 0000000..c742eac
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/KnownSize.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+/**
+ * Interface for elements which can quickly estimate their encoded byte size.
+ */
+public interface KnownSize {
+  long sizeInBytes();
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
new file mode 100644
index 0000000..ac22879
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query3.
+ */
+public class NameCityStateId implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<NameCityStateId> CODER = new AtomicCoder<NameCityStateId>() {
+    @Override
+    public void encode(NameCityStateId value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      STRING_CODER.encode(value.name, outStream, Context.NESTED);
+      STRING_CODER.encode(value.city, outStream, Context.NESTED);
+      STRING_CODER.encode(value.state, outStream, Context.NESTED);
+      LONG_CODER.encode(value.id, outStream, Context.NESTED);
+    }
+
+    @Override
+    public NameCityStateId decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      String name = STRING_CODER.decode(inStream, Context.NESTED);
+      String city = STRING_CODER.decode(inStream, Context.NESTED);
+      String state = STRING_CODER.decode(inStream, Context.NESTED);
+      long id = LONG_CODER.decode(inStream, Context.NESTED);
+      return new NameCityStateId(name, city, state, id);
+    }
+  };
+
+  @JsonProperty
+  public final String name;
+
+  @JsonProperty
+  public final String city;
+
+  @JsonProperty
+  public final String state;
+
+  @JsonProperty
+  public final long id;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private NameCityStateId() {
+    name = null;
+    city = null;
+    state = null;
+    id = 0;
+  }
+
+  public NameCityStateId(String name, String city, String state, long id) {
+    this.name = name;
+    this.city = city;
+    this.state = state;
+    this.id = id;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
new file mode 100644
index 0000000..85c7183
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * A person either creating an auction or making a bid.
+ */
+public class Person implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+  public static final Coder<Person> CODER = new AtomicCoder<Person>() {
+    @Override
+    public void encode(Person value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.id, outStream, Context.NESTED);
+      STRING_CODER.encode(value.name, outStream, Context.NESTED);
+      STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED);
+      STRING_CODER.encode(value.creditCard, outStream, Context.NESTED);
+      STRING_CODER.encode(value.city, outStream, Context.NESTED);
+      STRING_CODER.encode(value.state, outStream, Context.NESTED);
+      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+    }
+
+    @Override
+    public Person decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long id = LONG_CODER.decode(inStream, Context.NESTED);
+      String name = STRING_CODER.decode(inStream, Context.NESTED);
+      String emailAddress = STRING_CODER.decode(inStream, Context.NESTED);
+      String creditCard = STRING_CODER.decode(inStream, Context.NESTED);
+      String city = STRING_CODER.decode(inStream, Context.NESTED);
+      String state = STRING_CODER.decode(inStream, Context.NESTED);
+      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+      String extra = STRING_CODER.decode(inStream, Context.NESTED);
+      return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
+    }
+  };
+
+  /** Id of person. */
+  @JsonProperty
+  public final long id; // primary key
+
+  /** Extra person properties. */
+  @JsonProperty
+  public final String name;
+
+  @JsonProperty
+  public final String emailAddress;
+
+  @JsonProperty
+  public final String creditCard;
+
+  @JsonProperty
+  public final String city;
+
+  @JsonProperty
+  public final String state;
+
+  @JsonProperty
+  public final long dateTime;
+
+  /** Additional arbitrary payload for performance testing. */
+  @JsonProperty
+  public final String extra;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Person() {
+    id = 0;
+    name = null;
+    emailAddress = null;
+    creditCard = null;
+    city = null;
+    state = null;
+    dateTime = 0;
+    extra = null;
+  }
+
+  public Person(long id, String name, String emailAddress, String creditCard, String city,
+      String state, long dateTime, String extra) {
+    this.id = id;
+    this.name = name;
+    this.emailAddress = emailAddress;
+    this.creditCard = creditCard;
+    this.city = city;
+    this.state = state;
+    this.dateTime = dateTime;
+    this.extra = extra;
+  }
+
+  /**
+   * Return a copy of person which capture the given annotation.
+   * (Used for debugging).
+   */
+  public Person withAnnotation(String annotation) {
+    return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+        annotation + ": " + extra);
+  }
+
+  /**
+   * Does person have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    return extra.startsWith(annotation + ": ");
+  }
+
+  /**
+   * Remove {@code annotation} from person. (Used for debugging.)
+   */
+  public Person withoutAnnotation(String annotation) {
+    if (hasAnnotation(annotation)) {
+      return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+          extra.substring(annotation.length() + 2));
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1
+        + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
new file mode 100644
index 0000000..b7c2b14
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query6.
+ */
+public class SellerPrice implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<SellerPrice> CODER = new AtomicCoder<SellerPrice>() {
+    @Override
+    public void encode(SellerPrice value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.seller, outStream, Context.NESTED);
+      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+    }
+
+    @Override
+    public SellerPrice decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long seller = LONG_CODER.decode(inStream, Context.NESTED);
+      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      return new SellerPrice(seller, price);
+    }
+  };
+
+  @JsonProperty
+  public final long seller;
+
+  /** Price in cents. */
+  @JsonProperty
+  public final long price;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private SellerPrice() {
+    seller = 0;
+    price = 0;
+  }
+
+  public SellerPrice(long seller, long price) {
+    this.seller = seller;
+    this.price = price;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java
new file mode 100644
index 0000000..e1d6113
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Nexmark Benchmark Model.
+ */
+package org.apache.beam.integration.nexmark.model;

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
index 65bf7d4..df6f09f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/package-info.java
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * Nexmark Benchmark Integration Queries.
+ * Nexmark.
  */
 package org.apache.beam.integration.nexmark;

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
new file mode 100644
index 0000000..f60d5de
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 0: Pass events through unchanged. However, force them to do a round trip through
+ * serialization so that we measure the impact of the choice of coders.
+ */
+public class Query0 extends NexmarkQuery {
+  public Query0(NexmarkConfiguration configuration) {
+    super(configuration, "Query0");
+  }
+
+  private PCollection<Event> applyTyped(PCollection<Event> events) {
+    final Coder<Event> coder = events.getCoder();
+    return events
+        // Force round trip through coder.
+        .apply(name + ".Serialize",
+            ParDo.of(new DoFn<Event, Event>() {
+                  private final Aggregator<Long, Long> bytes =
+                      createAggregator("bytes", Sum.ofLongs());
+
+                  @ProcessElement
+                  public void processElement(ProcessContext c) throws CoderException, IOException {
+                    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+                    coder.encode(c.element(), outStream, Coder.Context.OUTER);
+                    byte[] byteArray = outStream.toByteArray();
+                    bytes.addValue((long) byteArray.length);
+                    ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
+                    Event event = coder.decode(inStream, Coder.Context.OUTER);
+                    c.output(event);
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
new file mode 100644
index 0000000..991b1d4
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.integration.nexmark.AbstractSimulator;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQueryModel;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query0}.
+ */
+public class Query0Model extends NexmarkQueryModel {
+  /**
+   * Simulator for query 0.
+   */
+  private class Simulator extends AbstractSimulator<Event, Event> {
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      addResult(timestampedEvent);
+      //TODO test fails because offset of some hundreds of ms beween expect and actual
+    }
+  }
+
+  public Query0Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  protected AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueTimestampOrder(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
new file mode 100644
index 0000000..0be77ce
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros.
+ * In CQL syntax:
+ *
+ * <pre>
+ * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)
+ * FROM bid [ROWS UNBOUNDED];
+ * </pre>
+ *
+ * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily
+ * slowed down.
+ */
+public class Query1 extends NexmarkQuery {
+  public Query1(NexmarkConfiguration configuration) {
+    super(configuration, "Query1");
+  }
+
+  private PCollection<Bid> applyTyped(PCollection<Event> events) {
+    return events
+        // Only want the bid events.
+        .apply(JUST_BIDS)
+
+        // Map the conversion function over all bids.
+        .apply(name + ".ToEuros",
+            ParDo.of(new DoFn<Bid, Bid>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    Bid bid = c.element();
+                    c.output(new Bid(
+                        bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
new file mode 100644
index 0000000..6912ed1
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Done;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Query "10", 'Log to sharded files' (Not in original suite.)
+ *
+ * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files.
+ */
+public class Query10 extends NexmarkQuery {
+  private static final Logger LOG = LoggerFactory.getLogger(Query10.class);
+  private static final int CHANNEL_BUFFER = 8 << 20; // 8MB
+  private static final int NUM_SHARDS_PER_WORKER = 5;
+  private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10);
+
+  /**
+   * Capture everything we need to know about the records in a single output file.
+   */
+  private static class OutputFile implements Serializable {
+    /** Maximum possible timestamp of records in file. */
+    private final Instant maxTimestamp;
+    /** Shard within window. */
+    private final String shard;
+    /** Index of file in all files in shard. */
+    private final long index;
+    /** Timing of records in this file. */
+    private final PaneInfo.Timing timing;
+    /** Path to file containing records, or {@literal null} if no output required. */
+    @Nullable
+    private final String filename;
+
+    public OutputFile(
+        Instant maxTimestamp,
+        String shard,
+        long index,
+        PaneInfo.Timing timing,
+        @Nullable String filename) {
+      this.maxTimestamp = maxTimestamp;
+      this.shard = shard;
+      this.index = index;
+      this.timing = timing;
+      this.filename = filename;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename);
+    }
+  }
+
+  /**
+   * GCS uri prefix for all log and 'finished' files. If null they won't be written.
+   */
+  @Nullable
+  private String outputPath;
+
+  /**
+   * Maximum number of workers, used to determine log sharding factor.
+   */
+  private int maxNumWorkers;
+
+  public Query10(NexmarkConfiguration configuration) {
+    super(configuration, "Query10");
+  }
+
+  public void setOutputPath(@Nullable String outputPath) {
+    this.outputPath = outputPath;
+  }
+
+  public void setMaxNumWorkers(int maxNumWorkers) {
+    this.maxNumWorkers = maxNumWorkers;
+  }
+
+  /**
+   * Return channel for writing bytes to GCS.
+   *
+   * @throws IOException
+   */
+  private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
+      throws IOException {
+    WritableByteChannel channel =
+            GcsIOChannelFactory.fromOptions(options).create(filename, "text/plain");
+    checkState(channel instanceof GoogleCloudStorageWriteChannel);
+    ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER);
+    return channel;
+  }
+
+  /** Return a short string to describe {@code timing}. */
+  private String timingToString(PaneInfo.Timing timing) {
+    switch (timing) {
+      case EARLY:
+        return "E";
+      case ON_TIME:
+        return "O";
+      case LATE:
+        return "L";
+    }
+    throw new RuntimeException(); // cases are exhaustive
+  }
+
+  /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */
+  private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) {
+    @Nullable String filename =
+        outputPath == null
+        ? null
+        : String.format("%s/LOG-%s-%s-%03d-%s-%x",
+            outputPath, window.maxTimestamp(), shard, pane.getIndex(),
+            timingToString(pane.getTiming()),
+            ThreadLocalRandom.current().nextLong());
+    return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(),
+        pane.getTiming(), filename);
+  }
+
+  /**
+   * Return path to which we should write the index for {@code window}, or {@literal null}
+   * if no output required.
+   */
+  @Nullable
+  private String indexPathFor(BoundedWindow window) {
+    if (outputPath == null) {
+      return null;
+    }
+    return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp());
+  }
+
+  private PCollection<Done> applyTyped(PCollection<Event> events) {
+    final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
+
+    return events.apply(name + ".ShardEvents",
+            ParDo.of(new DoFn<Event, KV<String, Event>>() {
+                      final Aggregator<Long, Long> lateCounter =
+                          createAggregator("actuallyLateEvent", Sum.ofLongs());
+                      final Aggregator<Long, Long> onTimeCounter =
+                          createAggregator("actuallyOnTimeEvent", Sum.ofLongs());
+
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        if (c.element().hasAnnotation("LATE")) {
+                          lateCounter.addValue(1L);
+                          LOG.error("Observed late: %s", c.element());
+                        } else {
+                          onTimeCounter.addValue(1L);
+                        }
+                        int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
+                        String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
+                        c.output(KV.of(shard, c.element()));
+                      }
+                    }))
+        .apply(name + ".WindowEvents",
+                Window.<KV<String, Event>>into(
+            FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+            .triggering(AfterEach.inOrder(
+                Repeatedly
+                    .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
+                    .orFinally(AfterWatermark.pastEndOfWindow()),
+                Repeatedly.forever(
+                    AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
+                        AfterProcessingTime.pastFirstElementInPane()
+                                           .plusDelayOf(LATE_BATCHING_PERIOD)))))
+            .discardingFiredPanes()
+            // Use a 1 day allowed lateness so that any forgotten hold will stall the
+            // pipeline for that period and be very noticeable.
+            .withAllowedLateness(Duration.standardDays(1)))
+        .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
+        .apply(name + ".CheckForLateEvents",
+            ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+                     KV<String, Iterable<Event>>>() {
+                   final Aggregator<Long, Long> earlyCounter =
+                       createAggregator("earlyShard", Sum.ofLongs());
+                   final Aggregator<Long, Long> onTimeCounter =
+                       createAggregator("onTimeShard", Sum.ofLongs());
+                   final Aggregator<Long, Long> lateCounter =
+                       createAggregator("lateShard", Sum.ofLongs());
+                   final Aggregator<Long, Long> unexpectedLatePaneCounter =
+                       createAggregator("ERROR_unexpectedLatePane", Sum.ofLongs());
+                   final Aggregator<Long, Long> unexpectedOnTimeElementCounter =
+                       createAggregator("ERROR_unexpectedOnTimeElement", Sum.ofLongs());
+
+                   @ProcessElement
+                   public void processElement(ProcessContext c, BoundedWindow window) {
+                     int numLate = 0;
+                     int numOnTime = 0;
+                     for (Event event : c.element().getValue()) {
+                       if (event.hasAnnotation("LATE")) {
+                         numLate++;
+                       } else {
+                         numOnTime++;
+                       }
+                     }
+                     String shard = c.element().getKey();
+                     LOG.error(
+                         "%s with timestamp %s has %d actually late and %d on-time "
+                             + "elements in pane %s for window %s",
+                         shard, c.timestamp(), numLate, numOnTime, c.pane(),
+                         window.maxTimestamp());
+                     if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
+                       if (numLate == 0) {
+                         LOG.error(
+                             "ERROR! No late events in late pane for %s", shard);
+                         unexpectedLatePaneCounter.addValue(1L);
+                       }
+                       if (numOnTime > 0) {
+                         LOG.error(
+                             "ERROR! Have %d on-time events in late pane for %s",
+                             numOnTime, shard);
+                         unexpectedOnTimeElementCounter.addValue(1L);
+                       }
+                       lateCounter.addValue(1L);
+                     } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
+                       if (numOnTime + numLate < configuration.maxLogEvents) {
+                         LOG.error(
+                             "ERROR! Only have %d events in early pane for %s",
+                             numOnTime + numLate, shard);
+                       }
+                       earlyCounter.addValue(1L);
+                     } else {
+                       onTimeCounter.addValue(1L);
+                     }
+                     c.output(c.element());
+                   }
+                 }))
+        .apply(name + ".UploadEvents",
+            ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+                     KV<Void, OutputFile>>() {
+                   final Aggregator<Long, Long> savedFileCounter =
+                       createAggregator("savedFile", Sum.ofLongs());
+                   final Aggregator<Long, Long> writtenRecordsCounter =
+                       createAggregator("writtenRecords", Sum.ofLongs());
+
+                   @ProcessElement
+                   public void processElement(ProcessContext c, BoundedWindow window)
+                           throws IOException {
+                     String shard = c.element().getKey();
+                     GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+                     OutputFile outputFile = outputFileFor(window, shard, c.pane());
+                     LOG.error(
+                         "Writing %s with record timestamp %s, window timestamp %s, pane %s",
+                         shard, c.timestamp(), window.maxTimestamp(), c.pane());
+                     if (outputFile.filename != null) {
+                       LOG.error("Beginning write to '%s'", outputFile.filename);
+                       int n = 0;
+                       try (OutputStream output =
+                                Channels.newOutputStream(openWritableGcsFile(options, outputFile
+                                    .filename))) {
+                         for (Event event : c.element().getValue()) {
+                           Event.CODER.encode(event, output, Coder.Context.OUTER);
+                           writtenRecordsCounter.addValue(1L);
+                           if (++n % 10000 == 0) {
+                             LOG.error("So far written %d records to '%s'", n,
+                                 outputFile.filename);
+                           }
+                         }
+                       }
+                       LOG.error("Written all %d records to '%s'", n, outputFile.filename);
+                     }
+                     savedFileCounter.addValue(1L);
+                     c.output(KV.<Void, OutputFile>of(null, outputFile));
+                   }
+                 }))
+        // Clear fancy triggering from above.
+        .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
+            FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+            .triggering(AfterWatermark.pastEndOfWindow())
+            // We expect no late data here, but we'll assume the worst so we can detect any.
+            .withAllowedLateness(Duration.standardDays(1))
+            .discardingFiredPanes())
+      // TODO etienne: unnecessary groupByKey? because aggregators are shared in parallel
+      // and Pardo is also in parallel, why group all elements in memory of the same executor?
+      .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create())
+        .apply(name + ".Index",
+            ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
+                   final Aggregator<Long, Long> unexpectedLateCounter =
+                       createAggregator("ERROR_unexpectedLate", Sum.ofLongs());
+                   final Aggregator<Long, Long> unexpectedEarlyCounter =
+                       createAggregator("ERROR_unexpectedEarly", Sum.ofLongs());
+                   final Aggregator<Long, Long> unexpectedIndexCounter =
+                       createAggregator("ERROR_unexpectedIndex", Sum.ofLongs());
+                   final Aggregator<Long, Long> finalizedCounter =
+                       createAggregator("indexed", Sum.ofLongs());
+
+                   @ProcessElement
+                   public void processElement(ProcessContext c, BoundedWindow window)
+                           throws IOException {
+                     if (c.pane().getTiming() == Timing.LATE) {
+                       unexpectedLateCounter.addValue(1L);
+                       LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
+                     } else if (c.pane().getTiming() == Timing.EARLY) {
+                       unexpectedEarlyCounter.addValue(1L);
+                       LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
+                     } else if (c.pane().getTiming() == Timing.ON_TIME
+                         && c.pane().getIndex() != 0) {
+                       unexpectedIndexCounter.addValue(1L);
+                       LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
+                     } else {
+                       GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+                       LOG.error(
+                           "Index with record timestamp %s, window timestamp %s, pane %s",
+                           c.timestamp(), window.maxTimestamp(), c.pane());
+
+                       @Nullable String filename = indexPathFor(window);
+                       if (filename != null) {
+                         LOG.error("Beginning write to '%s'", filename);
+                         int n = 0;
+                         try (OutputStream output =
+                                  Channels.newOutputStream(
+                                      openWritableGcsFile(options, filename))) {
+                           for (OutputFile outputFile : c.element().getValue()) {
+                             output.write(outputFile.toString().getBytes());
+                             n++;
+                           }
+                         }
+                         LOG.error("Written all %d lines to '%s'", n, filename);
+                       }
+                       c.output(
+                           new Done("written for timestamp " + window.maxTimestamp()));
+                       finalizedCounter.addValue(1L);
+                     }
+                   }
+                 }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
new file mode 100644
index 0000000..4da99eb
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.BidsPerSession;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query "11", 'User sessions' (Not in original suite.)
+ *
+ * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap.
+ * However limit the session to at most {@code maxLogEvents}. Emit the number of
+ * bids per session.
+ */
+public class Query11 extends NexmarkQuery {
+  public Query11(NexmarkConfiguration configuration) {
+    super(configuration, "Query11");
+  }
+
+  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+    return events.apply(JUST_BIDS)
+        .apply(name + ".Rekey",
+          // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
+          ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    Bid bid = c.element();
+                    c.output(KV.of(bid.bidder, (Void) null));
+                  }
+                }))
+        .apply(Window.<KV<Long, Void>>into(
+            Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
+        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
+        .discardingFiredPanes()
+        .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)))
+        .apply(Count.<Long, Void>perKey())
+        .apply(name + ".ToResult",
+            ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
new file mode 100644
index 0000000..c67401b
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.BidsPerSession;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query "12", 'Processing time windows' (Not in original suite.)
+ *
+ * <p>Group bids by the same user into processing time windows of windowSize. Emit the count
+ * of bids per window.
+ */
+public class Query12 extends NexmarkQuery {
+  public Query12(NexmarkConfiguration configuration) {
+    super(configuration, "Query12");
+  }
+
+  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+    return events
+        .apply(JUST_BIDS)
+        .apply(name + ".Rekey",
+          // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
+            ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
+                   @ProcessElement
+                   public void processElement(ProcessContext c) {
+                     Bid bid = c.element();
+                     c.output(KV.of(bid.bidder, (Void) null));
+                   }
+                 }))
+        .apply(Window.<KV<Long, Void>>into(new GlobalWindows())
+            .triggering(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane()
+                                       .plusDelayOf(
+                                           Duration.standardSeconds(configuration.windowSizeSec))))
+            .discardingFiredPanes()
+            .withAllowedLateness(Duration.ZERO))
+        .apply(Count.<Long, Void>perKey())
+        .apply(name + ".ToResult",
+            ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+                   @ProcessElement
+                   public void processElement(ProcessContext c) {
+                     c.output(
+                         new BidsPerSession(c.element().getKey(), c.element().getValue()));
+                   }
+                 }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
new file mode 100644
index 0000000..58037d3
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.integration.nexmark.AbstractSimulator;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQueryModel;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query1}.
+ */
+public class Query1Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 1.
+   */
+  private class Simulator extends AbstractSimulator<Event, Bid> {
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non-bid events.
+        return;
+      }
+      Bid bid = event.bid;
+      Bid resultBid =
+          new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra);
+      TimestampedValue<Bid> result =
+          TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
+      addResult(result);
+      //TODO test fails because offset of some hundreds of ms beween expect and actual
+    }
+  }
+
+  public Query1Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueTimestampOrder(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
new file mode 100644
index 0000000..4c8f878
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.AuctionPrice;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price.
+ * In CQL syntax:
+ *
+ * <pre>
+ * SELECT Rstream(auction, price)
+ * FROM Bid [NOW]
+ * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
+ * </pre>
+ *
+ * <p>As written that query will only yield a few hundred results over event streams of
+ * arbitrary size. To make it more interesting we instead choose bids for every
+ * {@code auctionSkip}'th auction.
+ */
+public class Query2 extends NexmarkQuery {
+  public Query2(NexmarkConfiguration configuration) {
+    super(configuration, "Query2");
+  }
+
+  private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) {
+    return events
+        // Only want the bid events.
+        .apply(JUST_BIDS)
+
+        // Select just the bids for the auctions we care about.
+        .apply(Filter.by(new SerializableFunction<Bid, Boolean>() {
+          @Override
+          public Boolean apply(Bid bid) {
+            return bid.auction % configuration.auctionSkip == 0;
+          }
+        }))
+
+        // Project just auction id and price.
+        .apply(name + ".Project",
+            ParDo.of(new DoFn<Bid, AuctionPrice>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    Bid bid = c.element();
+                    c.output(new AuctionPrice(bid.auction, bid.price));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
new file mode 100644
index 0000000..f578e4c
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.integration.nexmark.AbstractSimulator;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkQueryModel;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.AuctionPrice;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query2}.
+ */
+public class Query2Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 2.
+   */
+  private class Simulator extends AbstractSimulator<Event, AuctionPrice> {
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non bid events.
+        return;
+      }
+      Bid bid = event.bid;
+      if (bid.auction % configuration.auctionSkip != 0) {
+        // Ignore bids for auctions we don't care about.
+        return;
+      }
+      AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price);
+      TimestampedValue<AuctionPrice> result =
+          TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp());
+      addResult(result);
+    }
+  }
+
+  public Query2Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueTimestampOrder(itr);
+  }
+}