You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:22 UTC

[14/55] [abbrv] beam git commit: Refactor classes into packages

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index d311dc4..e8d791f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -26,7 +26,6 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -35,6 +34,35 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
+import org.apache.beam.integration.nexmark.io.PubsubHelper;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.integration.nexmark.queries.Query0;
+import org.apache.beam.integration.nexmark.queries.Query0Model;
+import org.apache.beam.integration.nexmark.queries.Query1;
+import org.apache.beam.integration.nexmark.queries.Query10;
+import org.apache.beam.integration.nexmark.queries.Query11;
+import org.apache.beam.integration.nexmark.queries.Query12;
+import org.apache.beam.integration.nexmark.queries.Query1Model;
+import org.apache.beam.integration.nexmark.queries.Query2;
+import org.apache.beam.integration.nexmark.queries.Query2Model;
+import org.apache.beam.integration.nexmark.queries.Query3;
+import org.apache.beam.integration.nexmark.queries.Query3Model;
+import org.apache.beam.integration.nexmark.queries.Query4;
+import org.apache.beam.integration.nexmark.queries.Query4Model;
+import org.apache.beam.integration.nexmark.queries.Query5;
+import org.apache.beam.integration.nexmark.queries.Query5Model;
+import org.apache.beam.integration.nexmark.queries.Query6;
+import org.apache.beam.integration.nexmark.queries.Query6Model;
+import org.apache.beam.integration.nexmark.queries.Query7;
+import org.apache.beam.integration.nexmark.queries.Query7Model;
+import org.apache.beam.integration.nexmark.queries.Query8;
+import org.apache.beam.integration.nexmark.queries.Query8Model;
+import org.apache.beam.integration.nexmark.queries.Query9;
+import org.apache.beam.integration.nexmark.queries.Query9Model;
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -56,7 +84,7 @@ import org.joda.time.Duration;
 /**
  * Run a single Nexmark query using a given configuration.
  */
-public abstract class NexmarkRunner<OptionT extends Options> {
+public abstract class NexmarkRunner<OptionT extends NexmarkOptions> {
   /**
    * Minimum number of samples needed for 'stead-state' rate calculation.
    */
@@ -84,7 +112,7 @@ public abstract class NexmarkRunner<OptionT extends Options> {
    */
   private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays(3);
   /**
-   * Options shared by all runs.
+   * NexmarkOptions shared by all runs.
    */
   protected final OptionT options;
 
@@ -359,7 +387,7 @@ public abstract class NexmarkRunner<OptionT extends Options> {
     return perf;
   }
 
-  String getJobId(PipelineResult job) {
+  protected String getJobId(PipelineResult job) {
     return "";
   }
 
@@ -461,7 +489,7 @@ public abstract class NexmarkRunner<OptionT extends Options> {
   /**
    * Build and run a pipeline using specified options.
    */
-  protected interface PipelineBuilder<OptionT extends Options> {
+  protected interface PipelineBuilder<OptionT extends NexmarkOptions> {
     void build(OptionT publishOnlyOptions);
   }
 
@@ -966,7 +994,7 @@ public abstract class NexmarkRunner<OptionT extends Options> {
             // We'll shutdown the publisher job when we notice the main job has finished.
             invokeBuilderForPublishOnlyPipeline(new PipelineBuilder() {
               @Override
-              public void build(Options publishOnlyOptions) {
+              public void build(NexmarkOptions publishOnlyOptions) {
                 Pipeline sp = Pipeline.create(options);
                 NexmarkUtils.setupPipeline(configuration.coderStrategy, sp);
                 publisherMonitor = new Monitor<Event>(queryName, "publisher");

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
deleted file mode 100644
index a46d38a..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
-/**
- * Run NexMark queries using the Spark runner.
- */
-class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOptions> {
-    /**
-     * Command line flags.
-     */
-    public interface NexmarkSparkOptions extends Options, SparkPipelineOptions {
-    }
-
-    /**
-     * Entry point.
-     */
-    public static void main(String[] args) {
-        NexmarkSparkOptions options =
-                PipelineOptionsFactory.fromArgs(args)
-                        .withValidation()
-                        .as(NexmarkSparkOptions.class);
-//        options.setRunner(org.apache.beam.runners.spark.SparkRunner.class);
-        options.setRunner(org.apache.beam.runners.spark.SparkRunnerDebugger.class);
-        NexmarkSparkRunner runner = new NexmarkSparkRunner(options);
-        new NexmarkSparkDriver().runAll(options, runner);
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
deleted file mode 100644
index 30ae9ca..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-/**
- * Run a query using the Spark runner.
- */
-public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.NexmarkSparkOptions> {
-    @Override
-    protected boolean isStreaming() {
-        return options.isStreaming();
-    }
-
-    @Override
-    protected int coresPerWorker() {
-        return 4;
-    }
-
-    @Override
-    protected int maxNumWorkers() {
-        return 5;
-    }
-
-    @Override
-    protected void invokeBuilderForPublishOnlyPipeline(
-            PipelineBuilder builder) {
-        builder.build(options);
-    }
-
-    @Override
-    protected void waitForPublisherPreload() {
-        throw new UnsupportedOperationException();
-    }
-
-
-    public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) {
-        super(options);
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
index bc47540..be7d7b8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSuite.java
@@ -100,7 +100,7 @@ public enum NexmarkSuite {
    * with any set command line flags, except for --isStreaming which is only respected for
    * the {@link #DEFAULT} suite.
    */
-  public Iterable<NexmarkConfiguration> getConfigurations(Options options) {
+  public Iterable<NexmarkConfiguration> getConfigurations(NexmarkOptions options) {
     Set<NexmarkConfiguration> results = new LinkedHashSet<>();
     for (NexmarkConfiguration configuration : configurations) {
       NexmarkConfiguration result = configuration.clone();

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index f7417d3..b0421a4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -20,14 +20,30 @@ package org.apache.beam.integration.nexmark;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.hash.Hashing;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
 import java.util.List;
-
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.AuctionCount;
+import org.apache.beam.integration.nexmark.model.AuctionPrice;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.BidsPerSession;
+import org.apache.beam.integration.nexmark.model.CategoryPrice;
+import org.apache.beam.integration.nexmark.model.Done;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.IdNameReserve;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.NameCityStateId;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.integration.nexmark.model.SellerPrice;
+import org.apache.beam.integration.nexmark.sources.BoundedEventSource;
+import org.apache.beam.integration.nexmark.sources.Generator;
+import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
+import org.apache.beam.integration.nexmark.sources.UnboundedEventSource;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -67,7 +83,7 @@ public class NexmarkUtils {
   /**
    * Mapper for (de)serializing JSON.
    */
-  static final ObjectMapper MAPPER = new ObjectMapper();
+  public static final ObjectMapper MAPPER = new ObjectMapper();
 
   /**
    * Possible sources for events.
@@ -382,7 +398,8 @@ public class NexmarkUtils {
    */
   public static PTransform<PBegin, PCollection<Event>> batchEventsSource(
           NexmarkConfiguration configuration) {
-    return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration), configuration.numEventGenerators));
+    return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration),
+      configuration.numEventGenerators));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java
deleted file mode 100644
index 388473d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Options.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PubsubOptions;
-
-/**
- * Command line flags.
- */
-public interface Options extends PubsubOptions {
-  @Description("Which suite to run. Default is to use command line arguments for one job.")
-  @Default.Enum("DEFAULT")
-  NexmarkSuite getSuite();
-
-  void setSuite(NexmarkSuite suite);
-
-  @Description("If true, and using the DataflowPipelineRunner, monitor the jobs as they run.")
-  @Default.Boolean(false)
-  boolean getMonitorJobs();
-
-  void setMonitorJobs(boolean monitorJobs);
-
-  @Description("Where the events come from.")
-  @Nullable
-  NexmarkUtils.SourceType getSourceType();
-
-  void setSourceType(NexmarkUtils.SourceType sourceType);
-
-  @Description("Prefix for input files if using avro input")
-  @Nullable
-  String getInputPath();
-
-  void setInputPath(String inputPath);
-
-  @Description("Where results go.")
-  @Nullable
-  NexmarkUtils.SinkType getSinkType();
-
-  void setSinkType(NexmarkUtils.SinkType sinkType);
-
-  @Description("Which mode to run in when source is PUBSUB.")
-  @Nullable
-  NexmarkUtils.PubSubMode getPubSubMode();
-
-  void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode);
-
-  @Description("Which query to run.")
-  @Nullable
-  Integer getQuery();
-
-  void setQuery(Integer query);
-
-  @Description("Prefix for output files if using text output for results or running Query 10.")
-  @Nullable
-  String getOutputPath();
-
-  void setOutputPath(String outputPath);
-
-  @Description("Base name of pubsub topic to publish to in streaming mode.")
-  @Nullable
-  @Default.String("nexmark")
-  String getPubsubTopic();
-
-  void setPubsubTopic(String pubsubTopic);
-
-  @Description("Base name of pubsub subscription to read from in streaming mode.")
-  @Nullable
-  @Default.String("nexmark")
-  String getPubsubSubscription();
-
-  void setPubsubSubscription(String pubsubSubscription);
-
-  @Description("Base name of BigQuery table name if using BigQuery output.")
-  @Nullable
-  @Default.String("nexmark")
-  String getBigQueryTable();
-
-  void setBigQueryTable(String bigQueryTable);
-
-  @Description("Approximate number of events to generate. "
-               + "Zero for effectively unlimited in streaming mode.")
-  @Nullable
-  Long getNumEvents();
-
-  void setNumEvents(Long numEvents);
-
-  @Description("Time in seconds to preload the subscription with data, at the initial input rate "
-               + "of the pipeline.")
-  @Nullable
-  Integer getPreloadSeconds();
-
-  void setPreloadSeconds(Integer preloadSeconds);
-
-  @Description("Number of unbounded sources to create events.")
-  @Nullable
-  Integer getNumEventGenerators();
-
-  void setNumEventGenerators(Integer numEventGenerators);
-
-  @Description("Shape of event rate curve.")
-  @Nullable
-  NexmarkUtils.RateShape getRateShape();
-
-  void setRateShape(NexmarkUtils.RateShape rateShape);
-
-  @Description("Initial overall event rate (in --rateUnit).")
-  @Nullable
-  Integer getFirstEventRate();
-
-  void setFirstEventRate(Integer firstEventRate);
-
-  @Description("Next overall event rate (in --rateUnit).")
-  @Nullable
-  Integer getNextEventRate();
-
-  void setNextEventRate(Integer nextEventRate);
-
-  @Description("Unit for rates.")
-  @Nullable
-  NexmarkUtils.RateUnit getRateUnit();
-
-  void setRateUnit(NexmarkUtils.RateUnit rateUnit);
-
-  @Description("Overall period of rate shape, in seconds.")
-  @Nullable
-  Integer getRatePeriodSec();
-
-  void setRatePeriodSec(Integer ratePeriodSec);
-
-  @Description("If true, relay events in real time in streaming mode.")
-  @Nullable
-  Boolean getIsRateLimited();
-
-  void setIsRateLimited(Boolean isRateLimited);
-
-  @Description("If true, use wallclock time as event time. Otherwise, use a deterministic"
-               + " time in the past so that multiple runs will see exactly the same event streams"
-               + " and should thus have exactly the same results.")
-  @Nullable
-  Boolean getUseWallclockEventTime();
-
-  void setUseWallclockEventTime(Boolean useWallclockEventTime);
-
-  @Description("Assert pipeline results match model results.")
-  @Nullable
-  boolean getAssertCorrectness();
-
-  void setAssertCorrectness(boolean assertCorrectness);
-
-  @Description("Log all input events.")
-  @Nullable
-  boolean getLogEvents();
-
-  void setLogEvents(boolean logEvents);
-
-  @Description("Log all query results.")
-  @Nullable
-  boolean getLogResults();
-
-  void setLogResults(boolean logResults);
-
-  @Description("Average size in bytes for a person record.")
-  @Nullable
-  Integer getAvgPersonByteSize();
-
-  void setAvgPersonByteSize(Integer avgPersonByteSize);
-
-  @Description("Average size in bytes for an auction record.")
-  @Nullable
-  Integer getAvgAuctionByteSize();
-
-  void setAvgAuctionByteSize(Integer avgAuctionByteSize);
-
-  @Description("Average size in bytes for a bid record.")
-  @Nullable
-  Integer getAvgBidByteSize();
-
-  void setAvgBidByteSize(Integer avgBidByteSize);
-
-  @Description("Ratio of bids for 'hot' auctions above the background.")
-  @Nullable
-  Integer getHotAuctionRatio();
-
-  void setHotAuctionRatio(Integer hotAuctionRatio);
-
-  @Description("Ratio of auctions for 'hot' sellers above the background.")
-  @Nullable
-  Integer getHotSellersRatio();
-
-  void setHotSellersRatio(Integer hotSellersRatio);
-
-  @Description("Ratio of auctions for 'hot' bidders above the background.")
-  @Nullable
-  Integer getHotBiddersRatio();
-
-  void setHotBiddersRatio(Integer hotBiddersRatio);
-
-  @Description("Window size in seconds.")
-  @Nullable
-  Long getWindowSizeSec();
-
-  void setWindowSizeSec(Long windowSizeSec);
-
-  @Description("Window period in seconds.")
-  @Nullable
-  Long getWindowPeriodSec();
-
-  void setWindowPeriodSec(Long windowPeriodSec);
-
-  @Description("If in streaming mode, the holdback for watermark in seconds.")
-  @Nullable
-  Long getWatermarkHoldbackSec();
-
-  void setWatermarkHoldbackSec(Long watermarkHoldbackSec);
-
-  @Description("Roughly how many auctions should be in flight for each generator.")
-  @Nullable
-  Integer getNumInFlightAuctions();
-
-  void setNumInFlightAuctions(Integer numInFlightAuctions);
-
-
-  @Description("Maximum number of people to consider as active for placing auctions or bids.")
-  @Nullable
-  Integer getNumActivePeople();
-
-  void setNumActivePeople(Integer numActivePeople);
-
-  @Description("Filename of perf data to append to.")
-  @Nullable
-  String getPerfFilename();
-
-  void setPerfFilename(String perfFilename);
-
-  @Description("Filename of baseline perf data to read from.")
-  @Nullable
-  String getBaselineFilename();
-
-  void setBaselineFilename(String baselineFilename);
-
-  @Description("Filename of summary perf data to append to.")
-  @Nullable
-  String getSummaryFilename();
-
-  void setSummaryFilename(String summaryFilename);
-
-  @Description("Filename for javascript capturing all perf data and any baselines.")
-  @Nullable
-  String getJavascriptFilename();
-
-  void setJavascriptFilename(String javascriptFilename);
-
-  @Description("If true, don't run the actual query. Instead, calculate the distribution "
-               + "of number of query results per (event time) minute according to the query model.")
-  @Nullable
-  boolean getJustModelResultRate();
-
-  void setJustModelResultRate(boolean justModelResultRate);
-
-  @Description("Coder strategy to use.")
-  @Nullable
-  NexmarkUtils.CoderStrategy getCoderStrategy();
-
-  void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy);
-
-  @Description("Delay, in milliseconds, for each event. We will peg one core for this "
-               + "number of milliseconds to simulate CPU-bound computation.")
-  @Nullable
-  Long getCpuDelayMs();
-
-  void setCpuDelayMs(Long cpuDelayMs);
-
-  @Description("Extra data, in bytes, to save to persistent state for each event. "
-               + "This will force I/O all the way to durable storage to simulate an "
-               + "I/O-bound computation.")
-  @Nullable
-  Long getDiskBusyBytes();
-
-  void setDiskBusyBytes(Long diskBusyBytes);
-
-  @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction")
-  @Nullable
-  Integer getAuctionSkip();
-
-  void setAuctionSkip(Integer auctionSkip);
-
-  @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).")
-  @Nullable
-  Integer getFanout();
-
-  void setFanout(Integer fanout);
-
-  @Description("Length of occasional delay to impose on events (in seconds).")
-  @Nullable
-  Long getOccasionalDelaySec();
-
-  void setOccasionalDelaySec(Long occasionalDelaySec);
-
-  @Description("Probability that an event will be delayed by delayS.")
-  @Nullable
-  Double getProbDelayedEvent();
-
-  void setProbDelayedEvent(Double probDelayedEvent);
-
-  @Description("Maximum size of each log file (in events). For Query10 only.")
-  @Nullable
-  Integer getMaxLogEvents();
-
-  void setMaxLogEvents(Integer maxLogEvents);
-
-  @Description("How to derive names of resources.")
-  @Default.Enum("QUERY_AND_SALT")
-  NexmarkUtils.ResourceNameMode getResourceNameMode();
-
-  void setResourceNameMode(NexmarkUtils.ResourceNameMode mode);
-
-  @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.")
-  @Default.Boolean(true)
-  boolean getManageResources();
-
-  void setManageResources(boolean manageResources);
-
-  @Description("If true, use pub/sub publish time instead of event time.")
-  @Nullable
-  Boolean getUsePubsubPublishTime();
-
-  void setUsePubsubPublishTime(Boolean usePubsubPublishTime);
-
-  @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. "
-               + "1000 implies every 1000 events per generator are emitted in pseudo-random order.")
-  @Nullable
-  Long getOutOfOrderGroupSize();
-
-  void setOutOfOrderGroupSize(Long outOfOrderGroupSize);
-
-  @Description("If false, do not add the Monitor and Snoop transforms.")
-  @Nullable
-  Boolean getDebug();
-
-  void setDebug(Boolean value);
-
-  @Description("If set, cancel running pipelines after this long")
-  @Nullable
-  Long getRunningTimeMinutes();
-
-  void setRunningTimeMinutes(Long value);
-
-  @Description("If set and --monitorJobs is true, check that the system watermark is never more "
-               + "than this far behind real time")
-  @Nullable
-  Long getMaxSystemLagSeconds();
-
-  void setMaxSystemLagSeconds(Long value);
-
-  @Description("If set and --monitorJobs is true, check that the data watermark is never more "
-               + "than this far behind real time")
-  @Nullable
-  Long getMaxDataLagSeconds();
-
-  void setMaxDataLagSeconds(Long value);
-
-  @Description("Only start validating watermarks after this many seconds")
-  @Nullable
-  Long getWatermarkValidationDelaySeconds();
-
-  void setWatermarkValidationDelaySeconds(Long value);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java
deleted file mode 100644
index 251a6ee..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Person.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * A person either creating an auction or making a bid.
- */
-public class Person implements KnownSize, Serializable {
-  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-  public static final Coder<Person> CODER = new AtomicCoder<Person>() {
-    @Override
-    public void encode(Person value, OutputStream outStream,
-        Coder.Context context)
-        throws CoderException, IOException {
-      LONG_CODER.encode(value.id, outStream, Context.NESTED);
-      STRING_CODER.encode(value.name, outStream, Context.NESTED);
-      STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED);
-      STRING_CODER.encode(value.creditCard, outStream, Context.NESTED);
-      STRING_CODER.encode(value.city, outStream, Context.NESTED);
-      STRING_CODER.encode(value.state, outStream, Context.NESTED);
-      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
-      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
-    }
-
-    @Override
-    public Person decode(
-        InputStream inStream, Coder.Context context)
-        throws CoderException, IOException {
-      long id = LONG_CODER.decode(inStream, Context.NESTED);
-      String name = STRING_CODER.decode(inStream, Context.NESTED);
-      String emailAddress = STRING_CODER.decode(inStream, Context.NESTED);
-      String creditCard = STRING_CODER.decode(inStream, Context.NESTED);
-      String city = STRING_CODER.decode(inStream, Context.NESTED);
-      String state = STRING_CODER.decode(inStream, Context.NESTED);
-      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
-      String extra = STRING_CODER.decode(inStream, Context.NESTED);
-      return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
-    }
-  };
-
-  /** Id of person. */
-  @JsonProperty
-  public final long id; // primary key
-
-  /** Extra person properties. */
-  @JsonProperty
-  public final String name;
-
-  @JsonProperty
-  public final String emailAddress;
-
-  @JsonProperty
-  public final String creditCard;
-
-  @JsonProperty
-  public final String city;
-
-  @JsonProperty
-  public final String state;
-
-  @JsonProperty
-  public final long dateTime;
-
-  /** Additional arbitrary payload for performance testing. */
-  @JsonProperty
-  public final String extra;
-
-  // For Avro only.
-  @SuppressWarnings("unused")
-  private Person() {
-    id = 0;
-    name = null;
-    emailAddress = null;
-    creditCard = null;
-    city = null;
-    state = null;
-    dateTime = 0;
-    extra = null;
-  }
-
-  public Person(long id, String name, String emailAddress, String creditCard, String city,
-      String state, long dateTime, String extra) {
-    this.id = id;
-    this.name = name;
-    this.emailAddress = emailAddress;
-    this.creditCard = creditCard;
-    this.city = city;
-    this.state = state;
-    this.dateTime = dateTime;
-    this.extra = extra;
-  }
-
-  /**
-   * Return a copy of person which capture the given annotation.
-   * (Used for debugging).
-   */
-  public Person withAnnotation(String annotation) {
-    return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
-        annotation + ": " + extra);
-  }
-
-  /**
-   * Does person have {@code annotation}? (Used for debugging.)
-   */
-  public boolean hasAnnotation(String annotation) {
-    return extra.startsWith(annotation + ": ");
-  }
-
-  /**
-   * Remove {@code annotation} from person. (Used for debugging.)
-   */
-  public Person withoutAnnotation(String annotation) {
-    if (hasAnnotation(annotation)) {
-      return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
-          extra.substring(annotation.length() + 2));
-    } else {
-      return this;
-    }
-  }
-
-  @Override
-  public long sizeInBytes() {
-    return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1
-        + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1;
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
deleted file mode 100644
index a79a25b..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubJsonClient;
-
-/**
- * Helper for working with pubsub.
- */
-public class PubsubHelper implements AutoCloseable {
-  /**
-   * Underlying pub/sub client.
-   */
-  private final PubsubClient pubsubClient;
-
-  /**
-   * Project id.
-   */
-  private final String projectId;
-
-  /**
-   * Topics we should delete on close.
-   */
-  private final List<PubsubClient.TopicPath> createdTopics;
-
-  /**
-   * Subscriptions we should delete on close.
-   */
-  private final List<PubsubClient.SubscriptionPath> createdSubscriptions;
-
-  private PubsubHelper(PubsubClient pubsubClient, String projectId) {
-    this.pubsubClient = pubsubClient;
-    this.projectId = projectId;
-    createdTopics = new ArrayList<>();
-    createdSubscriptions = new ArrayList<>();
-  }
-
-  /**
-   * Create a helper.
-   */
-  public static PubsubHelper create(PubsubOptions options) {
-    try {
-      return new PubsubHelper(
-          PubsubJsonClient.FACTORY.newClient(null, null, options),
-          options.getProject());
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to create Pubsub client: ", e);
-    }
-  }
-
-  /**
-   * Create a topic from short name. Delete it if it already exists. Ensure the topic will be
-   * deleted on cleanup. Return full topic name.
-   */
-  public PubsubClient.TopicPath createTopic(String shortTopic) {
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    try {
-      if (topicExists(shortTopic)) {
-        NexmarkUtils.console("attempting to cleanup topic %s", topic);
-        pubsubClient.deleteTopic(topic);
-      }
-      NexmarkUtils.console("create topic %s", topic);
-      pubsubClient.createTopic(topic);
-      createdTopics.add(topic);
-      return topic;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e);
-    }
-  }
-
-  /**
-   * Create a topic from short name if it does not already exist. The topic will not be
-   * deleted on cleanup. Return full topic name.
-   */
-  public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) {
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    try {
-      if (topicExists(shortTopic)) {
-        NexmarkUtils.console("topic %s already exists", topic);
-        return topic;
-      }
-      NexmarkUtils.console("create topic %s", topic);
-      pubsubClient.createTopic(topic);
-      return topic;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e);
-    }
-  }
-
-  /**
-   * Check a topic corresponding to short name exists, and throw exception if not. The
-   * topic will not be deleted on cleanup. Return full topic name.
-   */
-  public PubsubClient.TopicPath reuseTopic(String shortTopic) {
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    if (topicExists(shortTopic)) {
-      NexmarkUtils.console("reusing existing topic %s", topic);
-      return topic;
-    }
-    throw new RuntimeException("topic '" + topic + "' does not already exist");
-  }
-
-  /**
-   * Does topic corresponding to short name exist?
-   */
-  public boolean topicExists(String shortTopic) {
-    PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    try {
-      Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project);
-      return existingTopics.contains(topic);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e);
-    }
-  }
-
-  /**
-   * Create subscription from short name. Delete subscription if it already exists. Ensure the
-   * subscription will be deleted on cleanup. Return full subscription name.
-   */
-  public PubsubClient.SubscriptionPath createSubscription(
-      String shortTopic, String shortSubscription) {
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    PubsubClient.SubscriptionPath subscription =
-        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
-    try {
-      if (subscriptionExists(shortTopic, shortSubscription)) {
-        NexmarkUtils.console("attempting to cleanup subscription %s", subscription);
-        pubsubClient.deleteSubscription(subscription);
-      }
-      NexmarkUtils.console("create subscription %s", subscription);
-      pubsubClient.createSubscription(topic, subscription, 60);
-      createdSubscriptions.add(subscription);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e);
-    }
-    return subscription;
-  }
-
-  /**
-   * Check a subscription corresponding to short name exists, and throw exception if not. The
-   * subscription will not be deleted on cleanup. Return full topic name.
-   */
-  public PubsubClient.SubscriptionPath reuseSubscription(
-      String shortTopic, String shortSubscription) {
-    PubsubClient.SubscriptionPath subscription =
-        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
-    if (subscriptionExists(shortTopic, shortSubscription)) {
-      NexmarkUtils.console("reusing existing subscription %s", subscription);
-      return subscription;
-    }
-    throw new RuntimeException("subscription'" + subscription + "' does not already exist");
-  }
-
-  /**
-   * Does subscription corresponding to short name exist?
-   */
-  public boolean subscriptionExists(String shortTopic, String shortSubscription) {
-    PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
-    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
-    PubsubClient.SubscriptionPath subscription =
-        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
-    try {
-      Collection<PubsubClient.SubscriptionPath> existingSubscriptions =
-          pubsubClient.listSubscriptions(project, topic);
-      return existingSubscriptions.contains(subscription);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e);
-    }
-  }
-
-  /**
-   * Delete all the subscriptions and topics we created.
-   */
-  @Override
-  public void close() {
-    for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) {
-      try {
-        NexmarkUtils.console("delete subscription %s", subscription);
-        pubsubClient.deleteSubscription(subscription);
-      } catch (IOException ex) {
-        NexmarkUtils.console("could not delete subscription %s", subscription);
-      }
-    }
-    for (PubsubClient.TopicPath topic : createdTopics) {
-      try {
-        NexmarkUtils.console("delete topic %s", topic);
-        pubsubClient.deleteTopic(topic);
-      } catch (IOException ex) {
-        NexmarkUtils.console("could not delete topic %s", topic);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java
deleted file mode 100644
index e88fce0..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query 0: Pass events through unchanged. However, force them to do a round trip through
- * serialization so that we measure the impact of the choice of coders.
- */
-public class Query0 extends NexmarkQuery {
-  public Query0(NexmarkConfiguration configuration) {
-    super(configuration, "Query0");
-  }
-
-  private PCollection<Event> applyTyped(PCollection<Event> events) {
-    final Coder<Event> coder = events.getCoder();
-    return events
-        // Force round trip through coder.
-        .apply(name + ".Serialize",
-            ParDo.of(new DoFn<Event, Event>() {
-                  private final Aggregator<Long, Long> bytes =
-                      createAggregator("bytes", Sum.ofLongs());
-
-                  @ProcessElement
-                  public void processElement(ProcessContext c) throws CoderException, IOException {
-                    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-                    coder.encode(c.element(), outStream, Coder.Context.OUTER);
-                    byte[] byteArray = outStream.toByteArray();
-                    bytes.addValue((long) byteArray.length);
-                    ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
-                    Event event = coder.decode(inStream, Coder.Context.OUTER);
-                    c.output(event);
-                  }
-                }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
deleted file mode 100644
index 37e3f93..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query0}.
- */
-public class Query0Model extends NexmarkQueryModel {
-  /**
-   * Simulator for query 0.
-   */
-  private class Simulator extends AbstractSimulator<Event, Event> {
-    public Simulator(NexmarkConfiguration configuration) {
-      super(NexmarkUtils.standardEventIterator(configuration));
-    }
-
-    @Override
-    protected void run() {
-      TimestampedValue<Event> timestampedEvent = nextInput();
-      if (timestampedEvent == null) {
-        allDone();
-        return;
-      }
-      addResult(timestampedEvent);
-      //TODO test fails because offset of some hundreds of ms beween expect and actual
-    }
-  }
-
-  public Query0Model(NexmarkConfiguration configuration) {
-    super(configuration);
-  }
-
-  @Override
-  protected AbstractSimulator<?, ?> simulator() {
-    return new Simulator(configuration);
-  }
-
-  @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
-    return toValueTimestampOrder(itr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java
deleted file mode 100644
index a1ecdeb..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros.
- * In CQL syntax:
- *
- * <pre>
- * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)
- * FROM bid [ROWS UNBOUNDED];
- * </pre>
- *
- * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily
- * slowed down.
- */
-class Query1 extends NexmarkQuery {
-  public Query1(NexmarkConfiguration configuration) {
-    super(configuration, "Query1");
-  }
-
-  private PCollection<Bid> applyTyped(PCollection<Event> events) {
-    return events
-        // Only want the bid events.
-        .apply(JUST_BIDS)
-
-        // Map the conversion function over all bids.
-        .apply(name + ".ToEuros",
-            ParDo.of(new DoFn<Bid, Bid>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    Bid bid = c.element();
-                    c.output(new Bid(
-                        bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra));
-                  }
-                }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java
deleted file mode 100644
index 7bdcb36..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query10.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.GcsOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.AfterEach;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Query "10", 'Log to sharded files' (Not in original suite.)
- *
- * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files.
- */
-class Query10 extends NexmarkQuery {
-  private static final Logger LOG = LoggerFactory.getLogger(Query10.class);
-  private static final int CHANNEL_BUFFER = 8 << 20; // 8MB
-  private static final int NUM_SHARDS_PER_WORKER = 5;
-  private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10);
-
-  /**
-   * Capture everything we need to know about the records in a single output file.
-   */
-  private static class OutputFile implements Serializable {
-    /** Maximum possible timestamp of records in file. */
-    private final Instant maxTimestamp;
-    /** Shard within window. */
-    private final String shard;
-    /** Index of file in all files in shard. */
-    private final long index;
-    /** Timing of records in this file. */
-    private final PaneInfo.Timing timing;
-    /** Path to file containing records, or {@literal null} if no output required. */
-    @Nullable
-    private final String filename;
-
-    public OutputFile(
-        Instant maxTimestamp,
-        String shard,
-        long index,
-        PaneInfo.Timing timing,
-        @Nullable String filename) {
-      this.maxTimestamp = maxTimestamp;
-      this.shard = shard;
-      this.index = index;
-      this.timing = timing;
-      this.filename = filename;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename);
-    }
-  }
-
-  /**
-   * GCS uri prefix for all log and 'finished' files. If null they won't be written.
-   */
-  @Nullable
-  private String outputPath;
-
-  /**
-   * Maximum number of workers, used to determine log sharding factor.
-   */
-  private int maxNumWorkers;
-
-  public Query10(NexmarkConfiguration configuration) {
-    super(configuration, "Query10");
-  }
-
-  public void setOutputPath(@Nullable String outputPath) {
-    this.outputPath = outputPath;
-  }
-
-  public void setMaxNumWorkers(int maxNumWorkers) {
-    this.maxNumWorkers = maxNumWorkers;
-  }
-
-  /**
-   * Return channel for writing bytes to GCS.
-   *
-   * @throws IOException
-   */
-  private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
-      throws IOException {
-    WritableByteChannel channel =
-            GcsIOChannelFactory.fromOptions(options).create(filename, "text/plain");
-    checkState(channel instanceof GoogleCloudStorageWriteChannel);
-    ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER);
-    return channel;
-  }
-
-  /** Return a short string to describe {@code timing}. */
-  private String timingToString(PaneInfo.Timing timing) {
-    switch (timing) {
-      case EARLY:
-        return "E";
-      case ON_TIME:
-        return "O";
-      case LATE:
-        return "L";
-    }
-    throw new RuntimeException(); // cases are exhaustive
-  }
-
-  /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */
-  private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) {
-    @Nullable String filename =
-        outputPath == null
-        ? null
-        : String.format("%s/LOG-%s-%s-%03d-%s-%x",
-            outputPath, window.maxTimestamp(), shard, pane.getIndex(),
-            timingToString(pane.getTiming()),
-            ThreadLocalRandom.current().nextLong());
-    return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(),
-        pane.getTiming(), filename);
-  }
-
-  /**
-   * Return path to which we should write the index for {@code window}, or {@literal null}
-   * if no output required.
-   */
-  @Nullable
-  private String indexPathFor(BoundedWindow window) {
-    if (outputPath == null) {
-      return null;
-    }
-    return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp());
-  }
-
-  private PCollection<Done> applyTyped(PCollection<Event> events) {
-    final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
-
-    return events.apply(name + ".ShardEvents",
-            ParDo.of(new DoFn<Event, KV<String, Event>>() {
-                      final Aggregator<Long, Long> lateCounter =
-                          createAggregator("actuallyLateEvent", Sum.ofLongs());
-                      final Aggregator<Long, Long> onTimeCounter =
-                          createAggregator("actuallyOnTimeEvent", Sum.ofLongs());
-
-                      @ProcessElement
-                      public void processElement(ProcessContext c) {
-                        if (c.element().hasAnnotation("LATE")) {
-                          lateCounter.addValue(1L);
-                          LOG.error("Observed late: %s", c.element());
-                        } else {
-                          onTimeCounter.addValue(1L);
-                        }
-                        int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
-                        String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
-                        c.output(KV.of(shard, c.element()));
-                      }
-                    }))
-        .apply(name + ".WindowEvents",
-                Window.<KV<String, Event>>into(
-            FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
-            .triggering(AfterEach.inOrder(
-                Repeatedly
-                    .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
-                    .orFinally(AfterWatermark.pastEndOfWindow()),
-                Repeatedly.forever(
-                    AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
-                        AfterProcessingTime.pastFirstElementInPane()
-                                           .plusDelayOf(LATE_BATCHING_PERIOD)))))
-            .discardingFiredPanes()
-            // Use a 1 day allowed lateness so that any forgotten hold will stall the
-            // pipeline for that period and be very noticeable.
-            .withAllowedLateness(Duration.standardDays(1)))
-        .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
-        .apply(name + ".CheckForLateEvents",
-            ParDo.of(new DoFn<KV<String, Iterable<Event>>,
-                     KV<String, Iterable<Event>>>() {
-                   final Aggregator<Long, Long> earlyCounter =
-                       createAggregator("earlyShard", Sum.ofLongs());
-                   final Aggregator<Long, Long> onTimeCounter =
-                       createAggregator("onTimeShard", Sum.ofLongs());
-                   final Aggregator<Long, Long> lateCounter =
-                       createAggregator("lateShard", Sum.ofLongs());
-                   final Aggregator<Long, Long> unexpectedLatePaneCounter =
-                       createAggregator("ERROR_unexpectedLatePane", Sum.ofLongs());
-                   final Aggregator<Long, Long> unexpectedOnTimeElementCounter =
-                       createAggregator("ERROR_unexpectedOnTimeElement", Sum.ofLongs());
-
-                   @ProcessElement
-                   public void processElement(ProcessContext c, BoundedWindow window) {
-                     int numLate = 0;
-                     int numOnTime = 0;
-                     for (Event event : c.element().getValue()) {
-                       if (event.hasAnnotation("LATE")) {
-                         numLate++;
-                       } else {
-                         numOnTime++;
-                       }
-                     }
-                     String shard = c.element().getKey();
-                     LOG.error(
-                         "%s with timestamp %s has %d actually late and %d on-time "
-                             + "elements in pane %s for window %s",
-                         shard, c.timestamp(), numLate, numOnTime, c.pane(),
-                         window.maxTimestamp());
-                     if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
-                       if (numLate == 0) {
-                         LOG.error(
-                             "ERROR! No late events in late pane for %s", shard);
-                         unexpectedLatePaneCounter.addValue(1L);
-                       }
-                       if (numOnTime > 0) {
-                         LOG.error(
-                             "ERROR! Have %d on-time events in late pane for %s",
-                             numOnTime, shard);
-                         unexpectedOnTimeElementCounter.addValue(1L);
-                       }
-                       lateCounter.addValue(1L);
-                     } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
-                       if (numOnTime + numLate < configuration.maxLogEvents) {
-                         LOG.error(
-                             "ERROR! Only have %d events in early pane for %s",
-                             numOnTime + numLate, shard);
-                       }
-                       earlyCounter.addValue(1L);
-                     } else {
-                       onTimeCounter.addValue(1L);
-                     }
-                     c.output(c.element());
-                   }
-                 }))
-        .apply(name + ".UploadEvents",
-            ParDo.of(new DoFn<KV<String, Iterable<Event>>,
-                     KV<Void, OutputFile>>() {
-                   final Aggregator<Long, Long> savedFileCounter =
-                       createAggregator("savedFile", Sum.ofLongs());
-                   final Aggregator<Long, Long> writtenRecordsCounter =
-                       createAggregator("writtenRecords", Sum.ofLongs());
-
-                   @ProcessElement
-                   public void processElement(ProcessContext c, BoundedWindow window)
-                           throws IOException {
-                     String shard = c.element().getKey();
-                     GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
-                     OutputFile outputFile = outputFileFor(window, shard, c.pane());
-                     LOG.error(
-                         "Writing %s with record timestamp %s, window timestamp %s, pane %s",
-                         shard, c.timestamp(), window.maxTimestamp(), c.pane());
-                     if (outputFile.filename != null) {
-                       LOG.error("Beginning write to '%s'", outputFile.filename);
-                       int n = 0;
-                       try (OutputStream output =
-                                Channels.newOutputStream(openWritableGcsFile(options, outputFile
-                                    .filename))) {
-                         for (Event event : c.element().getValue()) {
-                           Event.CODER.encode(event, output, Coder.Context.OUTER);
-                           writtenRecordsCounter.addValue(1L);
-                           if (++n % 10000 == 0) {
-                             LOG.error("So far written %d records to '%s'", n,
-                                 outputFile.filename);
-                           }
-                         }
-                       }
-                       LOG.error("Written all %d records to '%s'", n, outputFile.filename);
-                     }
-                     savedFileCounter.addValue(1L);
-                     c.output(KV.<Void, OutputFile>of(null, outputFile));
-                   }
-                 }))
-        // Clear fancy triggering from above.
-        .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
-            FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
-            .triggering(AfterWatermark.pastEndOfWindow())
-            // We expect no late data here, but we'll assume the worst so we can detect any.
-            .withAllowedLateness(Duration.standardDays(1))
-            .discardingFiredPanes())
-      // TODO etienne: unnecessary groupByKey? because aggregators are shared in parallel
-      // and Pardo is also in parallel, why group all elements in memory of the same executor?
-      .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create())
-        .apply(name + ".Index",
-            ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
-                   final Aggregator<Long, Long> unexpectedLateCounter =
-                       createAggregator("ERROR_unexpectedLate", Sum.ofLongs());
-                   final Aggregator<Long, Long> unexpectedEarlyCounter =
-                       createAggregator("ERROR_unexpectedEarly", Sum.ofLongs());
-                   final Aggregator<Long, Long> unexpectedIndexCounter =
-                       createAggregator("ERROR_unexpectedIndex", Sum.ofLongs());
-                   final Aggregator<Long, Long> finalizedCounter =
-                       createAggregator("indexed", Sum.ofLongs());
-
-                   @ProcessElement
-                   public void processElement(ProcessContext c, BoundedWindow window)
-                           throws IOException {
-                     if (c.pane().getTiming() == Timing.LATE) {
-                       unexpectedLateCounter.addValue(1L);
-                       LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
-                     } else if (c.pane().getTiming() == Timing.EARLY) {
-                       unexpectedEarlyCounter.addValue(1L);
-                       LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
-                     } else if (c.pane().getTiming() == Timing.ON_TIME
-                         && c.pane().getIndex() != 0) {
-                       unexpectedIndexCounter.addValue(1L);
-                       LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
-                     } else {
-                       GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
-                       LOG.error(
-                           "Index with record timestamp %s, window timestamp %s, pane %s",
-                           c.timestamp(), window.maxTimestamp(), c.pane());
-
-                       @Nullable String filename = indexPathFor(window);
-                       if (filename != null) {
-                         LOG.error("Beginning write to '%s'", filename);
-                         int n = 0;
-                         try (OutputStream output =
-                                  Channels.newOutputStream(
-                                      openWritableGcsFile(options, filename))) {
-                           for (OutputFile outputFile : c.element().getValue()) {
-                             output.write(outputFile.toString().getBytes());
-                             n++;
-                           }
-                         }
-                         LOG.error("Written all %d lines to '%s'", n, filename);
-                       }
-                       c.output(
-                           new Done("written for timestamp " + window.maxTimestamp()));
-                       finalizedCounter.addValue(1L);
-                     }
-                   }
-                 }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java
deleted file mode 100644
index d610b7c..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query11.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query "11", 'User sessions' (Not in original suite.)
- *
- * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap.
- * However limit the session to at most {@code maxLogEvents}. Emit the number of
- * bids per session.
- */
-class Query11 extends NexmarkQuery {
-  public Query11(NexmarkConfiguration configuration) {
-    super(configuration, "Query11");
-  }
-
-  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
-    return events.apply(JUST_BIDS)
-        .apply(name + ".Rekey",
-          // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
-          ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    Bid bid = c.element();
-                    c.output(KV.of(bid.bidder, (Void) null));
-                  }
-                }))
-        .apply(Window.<KV<Long, Void>>into(
-            Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
-        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
-        .discardingFiredPanes()
-        .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)))
-        .apply(Count.<Long, Void>perKey())
-        .apply(name + ".ToResult",
-            ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
-                  }
-                }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java
deleted file mode 100644
index 72fbb57..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query12.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query "12", 'Processing time windows' (Not in original suite.)
- *
- * <p>Group bids by the same user into processing time windows of windowSize. Emit the count
- * of bids per window.
- */
-class Query12 extends NexmarkQuery {
-  public Query12(NexmarkConfiguration configuration) {
-    super(configuration, "Query12");
-  }
-
-  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
-    return events
-        .apply(JUST_BIDS)
-        .apply(name + ".Rekey",
-          // TODO etienne: why not avoid this ParDo and do a Cont.perElement?
-            ParDo.of(new DoFn<Bid, KV<Long, Void>>() {
-                   @ProcessElement
-                   public void processElement(ProcessContext c) {
-                     Bid bid = c.element();
-                     c.output(KV.of(bid.bidder, (Void) null));
-                   }
-                 }))
-        .apply(Window.<KV<Long, Void>>into(new GlobalWindows())
-            .triggering(
-                Repeatedly.forever(
-                    AfterProcessingTime.pastFirstElementInPane()
-                                       .plusDelayOf(
-                                           Duration.standardSeconds(configuration.windowSizeSec))))
-            .discardingFiredPanes()
-            .withAllowedLateness(Duration.ZERO))
-        .apply(Count.<Long, Void>perKey())
-        .apply(name + ".ToResult",
-            ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
-                   @ProcessElement
-                   public void processElement(ProcessContext c) {
-                     c.output(
-                         new BidsPerSession(c.element().getKey(), c.element().getValue()));
-                   }
-                 }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
deleted file mode 100644
index 16287e6..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query1}.
- */
-public class Query1Model extends NexmarkQueryModel implements Serializable {
-  /**
-   * Simulator for query 1.
-   */
-  private class Simulator extends AbstractSimulator<Event, Bid> {
-    public Simulator(NexmarkConfiguration configuration) {
-      super(NexmarkUtils.standardEventIterator(configuration));
-    }
-
-    @Override
-    protected void run() {
-      TimestampedValue<Event> timestampedEvent = nextInput();
-      if (timestampedEvent == null) {
-        allDone();
-        return;
-      }
-      Event event = timestampedEvent.getValue();
-      if (event.bid == null) {
-        // Ignore non-bid events.
-        return;
-      }
-      Bid bid = event.bid;
-      Bid resultBid =
-          new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra);
-      TimestampedValue<Bid> result =
-          TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
-      addResult(result);
-      //TODO test fails because offset of some hundreds of ms beween expect and actual
-    }
-  }
-
-  public Query1Model(NexmarkConfiguration configuration) {
-    super(configuration);
-  }
-
-  @Override
-  public AbstractSimulator<?, ?> simulator() {
-    return new Simulator(configuration);
-  }
-
-  @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
-    return toValueTimestampOrder(itr);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java
deleted file mode 100644
index 828cdf5..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price.
- * In CQL syntax:
- *
- * <pre>
- * SELECT Rstream(auction, price)
- * FROM Bid [NOW]
- * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
- * </pre>
- *
- * <p>As written that query will only yield a few hundred results over event streams of
- * arbitrary size. To make it more interesting we instead choose bids for every
- * {@code auctionSkip}'th auction.
- */
-class Query2 extends NexmarkQuery {
-  public Query2(NexmarkConfiguration configuration) {
-    super(configuration, "Query2");
-  }
-
-  private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) {
-    return events
-        // Only want the bid events.
-        .apply(JUST_BIDS)
-
-        // Select just the bids for the auctions we care about.
-        .apply(Filter.by(new SerializableFunction<Bid, Boolean>() {
-          @Override
-          public Boolean apply(Bid bid) {
-            return bid.auction % configuration.auctionSkip == 0;
-          }
-        }))
-
-        // Project just auction id and price.
-        .apply(name + ".Project",
-            ParDo.of(new DoFn<Bid, AuctionPrice>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    Bid bid = c.element();
-                    c.output(new AuctionPrice(bid.auction, bid.price));
-                  }
-                }));
-  }
-
-  @Override
-  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
-    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java
deleted file mode 100644
index 7769e52..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query2Model.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query2}.
- */
-public class Query2Model extends NexmarkQueryModel implements Serializable {
-  /**
-   * Simulator for query 2.
-   */
-  private class Simulator extends AbstractSimulator<Event, AuctionPrice> {
-    public Simulator(NexmarkConfiguration configuration) {
-      super(NexmarkUtils.standardEventIterator(configuration));
-    }
-
-    @Override
-    protected void run() {
-      TimestampedValue<Event> timestampedEvent = nextInput();
-      if (timestampedEvent == null) {
-        allDone();
-        return;
-      }
-      Event event = timestampedEvent.getValue();
-      if (event.bid == null) {
-        // Ignore non bid events.
-        return;
-      }
-      Bid bid = event.bid;
-      if (bid.auction % configuration.auctionSkip != 0) {
-        // Ignore bids for auctions we don't care about.
-        return;
-      }
-      AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price);
-      TimestampedValue<AuctionPrice> result =
-          TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp());
-      addResult(result);
-    }
-  }
-
-  public Query2Model(NexmarkConfiguration configuration) {
-    super(configuration);
-  }
-
-  @Override
-  public AbstractSimulator<?, ?> simulator() {
-    return new Simulator(configuration);
-  }
-
-  @Override
-  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
-    return toValueTimestampOrder(itr);
-  }
-}