You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:10:00 UTC

[52/55] [abbrv] beam git commit: Move module beam-integration-java-nexmark to beam-sdks-java-nexmark

Move module beam-integration-java-nexmark to beam-sdks-java-nexmark


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4333df7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4333df7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4333df7

Branch: refs/heads/master
Commit: f4333df77267d5207f0f23ae62e79b171a00e8a7
Parents: 2f9b494
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Thu Jun 15 11:55:26 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:29 2017 +0200

----------------------------------------------------------------------
 integration/java/nexmark/README.md              |  340 -----
 integration/java/nexmark/pom.xml                |  292 -----
 .../apache/beam/integration/nexmark/Main.java   |  304 -----
 .../beam/integration/nexmark/Monitor.java       |   79 --
 .../nexmark/NexmarkConfiguration.java           |  721 -----------
 .../integration/nexmark/NexmarkLauncher.java    | 1158 ------------------
 .../integration/nexmark/NexmarkOptions.java     |  403 ------
 .../beam/integration/nexmark/NexmarkPerf.java   |  208 ----
 .../beam/integration/nexmark/NexmarkSuite.java  |  112 --
 .../beam/integration/nexmark/NexmarkUtils.java  |  672 ----------
 .../beam/integration/nexmark/model/Auction.java |  187 ---
 .../integration/nexmark/model/AuctionBid.java   |   84 --
 .../integration/nexmark/model/AuctionCount.java |   84 --
 .../integration/nexmark/model/AuctionPrice.java |   88 --
 .../beam/integration/nexmark/model/Bid.java     |  177 ---
 .../nexmark/model/BidsPerSession.java           |   87 --
 .../nexmark/model/CategoryPrice.java            |   97 --
 .../beam/integration/nexmark/model/Done.java    |   80 --
 .../beam/integration/nexmark/model/Event.java   |  171 ---
 .../nexmark/model/IdNameReserve.java            |   98 --
 .../integration/nexmark/model/KnownSize.java    |   26 -
 .../nexmark/model/NameCityStateId.java          |  103 --
 .../beam/integration/nexmark/model/Person.java  |  163 ---
 .../integration/nexmark/model/SellerPrice.java  |   89 --
 .../integration/nexmark/model/package-info.java |   22 -
 .../beam/integration/nexmark/package-info.java  |   21 -
 .../nexmark/queries/AbstractSimulator.java      |  211 ----
 .../nexmark/queries/NexmarkQuery.java           |  270 ----
 .../nexmark/queries/NexmarkQueryModel.java      |  118 --
 .../integration/nexmark/queries/Query0.java     |   71 --
 .../nexmark/queries/Query0Model.java            |   64 -
 .../integration/nexmark/queries/Query1.java     |   67 -
 .../integration/nexmark/queries/Query10.java    |  367 ------
 .../integration/nexmark/queries/Query11.java    |   79 --
 .../integration/nexmark/queries/Query12.java    |   80 --
 .../nexmark/queries/Query1Model.java            |   76 --
 .../integration/nexmark/queries/Query2.java     |   79 --
 .../nexmark/queries/Query2Model.java            |   80 --
 .../integration/nexmark/queries/Query3.java     |  301 -----
 .../nexmark/queries/Query3Model.java            |  124 --
 .../integration/nexmark/queries/Query4.java     |  116 --
 .../nexmark/queries/Query4Model.java            |  186 ---
 .../integration/nexmark/queries/Query5.java     |  138 ---
 .../nexmark/queries/Query5Model.java            |  176 ---
 .../integration/nexmark/queries/Query6.java     |  155 ---
 .../nexmark/queries/Query6Model.java            |  133 --
 .../integration/nexmark/queries/Query7.java     |   90 --
 .../nexmark/queries/Query7Model.java            |  130 --
 .../integration/nexmark/queries/Query8.java     |   97 --
 .../nexmark/queries/Query8Model.java            |  148 ---
 .../integration/nexmark/queries/Query9.java     |   44 -
 .../nexmark/queries/Query9Model.java            |   44 -
 .../nexmark/queries/WinningBids.java            |  412 -------
 .../nexmark/queries/WinningBidsSimulator.java   |  206 ----
 .../nexmark/queries/package-info.java           |   22 -
 .../nexmark/sources/BoundedEventSource.java     |  190 ---
 .../integration/nexmark/sources/Generator.java  |  609 ---------
 .../nexmark/sources/GeneratorConfig.java        |  301 -----
 .../nexmark/sources/UnboundedEventSource.java   |  330 -----
 .../nexmark/sources/package-info.java           |   22 -
 .../nexmark/src/main/resources/log4j.properties |   55 -
 .../integration/nexmark/queries/QueryTest.java  |  185 ---
 .../nexmark/sources/BoundedEventSourceTest.java |   71 --
 .../nexmark/sources/GeneratorTest.java          |  111 --
 .../sources/UnboundedEventSourceTest.java       |  107 --
 integration/java/pom.xml                        |   37 -
 integration/pom.xml                             |   51 -
 pom.xml                                         |    1 -
 sdks/java/nexmark/README.md                     |  340 +++++
 sdks/java/nexmark/pom.xml                       |  292 +++++
 .../java/org/apache/beam/sdk/nexmark/Main.java  |  303 +++++
 .../org/apache/beam/sdk/nexmark/Monitor.java    |   78 ++
 .../beam/sdk/nexmark/NexmarkConfiguration.java  |  721 +++++++++++
 .../beam/sdk/nexmark/NexmarkLauncher.java       | 1157 +++++++++++++++++
 .../apache/beam/sdk/nexmark/NexmarkOptions.java |  403 ++++++
 .../apache/beam/sdk/nexmark/NexmarkPerf.java    |  207 ++++
 .../apache/beam/sdk/nexmark/NexmarkSuite.java   |  112 ++
 .../apache/beam/sdk/nexmark/NexmarkUtils.java   |  674 ++++++++++
 .../apache/beam/sdk/nexmark/model/Auction.java  |  187 +++
 .../beam/sdk/nexmark/model/AuctionBid.java      |   85 ++
 .../beam/sdk/nexmark/model/AuctionCount.java    |   84 ++
 .../beam/sdk/nexmark/model/AuctionPrice.java    |   88 ++
 .../org/apache/beam/sdk/nexmark/model/Bid.java  |  177 +++
 .../beam/sdk/nexmark/model/BidsPerSession.java  |   87 ++
 .../beam/sdk/nexmark/model/CategoryPrice.java   |   97 ++
 .../org/apache/beam/sdk/nexmark/model/Done.java |   80 ++
 .../apache/beam/sdk/nexmark/model/Event.java    |  171 +++
 .../beam/sdk/nexmark/model/IdNameReserve.java   |   98 ++
 .../beam/sdk/nexmark/model/KnownSize.java       |   26 +
 .../beam/sdk/nexmark/model/NameCityStateId.java |  103 ++
 .../apache/beam/sdk/nexmark/model/Person.java   |  163 +++
 .../beam/sdk/nexmark/model/SellerPrice.java     |   89 ++
 .../beam/sdk/nexmark/model/package-info.java    |   22 +
 .../apache/beam/sdk/nexmark/package-info.java   |   21 +
 .../sdk/nexmark/queries/AbstractSimulator.java  |  211 ++++
 .../beam/sdk/nexmark/queries/NexmarkQuery.java  |  270 ++++
 .../sdk/nexmark/queries/NexmarkQueryModel.java  |  118 ++
 .../apache/beam/sdk/nexmark/queries/Query0.java |   70 ++
 .../beam/sdk/nexmark/queries/Query0Model.java   |   64 +
 .../apache/beam/sdk/nexmark/queries/Query1.java |   67 +
 .../beam/sdk/nexmark/queries/Query10.java       |  367 ++++++
 .../beam/sdk/nexmark/queries/Query11.java       |   79 ++
 .../beam/sdk/nexmark/queries/Query12.java       |   80 ++
 .../beam/sdk/nexmark/queries/Query1Model.java   |   76 ++
 .../apache/beam/sdk/nexmark/queries/Query2.java |   79 ++
 .../beam/sdk/nexmark/queries/Query2Model.java   |   80 ++
 .../apache/beam/sdk/nexmark/queries/Query3.java |  301 +++++
 .../beam/sdk/nexmark/queries/Query3Model.java   |  124 ++
 .../apache/beam/sdk/nexmark/queries/Query4.java |  116 ++
 .../beam/sdk/nexmark/queries/Query4Model.java   |  186 +++
 .../apache/beam/sdk/nexmark/queries/Query5.java |  138 +++
 .../beam/sdk/nexmark/queries/Query5Model.java   |  176 +++
 .../apache/beam/sdk/nexmark/queries/Query6.java |  155 +++
 .../beam/sdk/nexmark/queries/Query6Model.java   |  133 ++
 .../apache/beam/sdk/nexmark/queries/Query7.java |   90 ++
 .../beam/sdk/nexmark/queries/Query7Model.java   |  130 ++
 .../apache/beam/sdk/nexmark/queries/Query8.java |   97 ++
 .../beam/sdk/nexmark/queries/Query8Model.java   |  148 +++
 .../apache/beam/sdk/nexmark/queries/Query9.java |   44 +
 .../beam/sdk/nexmark/queries/Query9Model.java   |   44 +
 .../beam/sdk/nexmark/queries/WinningBids.java   |  412 +++++++
 .../nexmark/queries/WinningBidsSimulator.java   |  206 ++++
 .../beam/sdk/nexmark/queries/package-info.java  |   22 +
 .../sdk/nexmark/sources/BoundedEventSource.java |  190 +++
 .../beam/sdk/nexmark/sources/Generator.java     |  609 +++++++++
 .../sdk/nexmark/sources/GeneratorConfig.java    |  298 +++++
 .../nexmark/sources/UnboundedEventSource.java   |  329 +++++
 .../beam/sdk/nexmark/sources/package-info.java  |   22 +
 .../nexmark/src/main/resources/log4j.properties |   55 +
 .../beam/sdk/nexmark/queries/QueryTest.java     |  185 +++
 .../nexmark/sources/BoundedEventSourceTest.java |   70 ++
 .../beam/sdk/nexmark/sources/GeneratorTest.java |  110 ++
 .../sources/UnboundedEventSourceTest.java       |  105 ++
 sdks/java/pom.xml                               |    1 +
 134 files changed, 11922 insertions(+), 12020 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/README.md
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md
deleted file mode 100644
index a9acd63..0000000
--- a/integration/java/nexmark/README.md
+++ /dev/null
@@ -1,340 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-# NEXMark integration suite
-
-This is a suite of pipelines inspired by the 'continuous data stream'
-queries in [http://datalab.cs.pdx.edu/niagaraST/NEXMark/]
-(http://datalab.cs.pdx.edu/niagaraST/NEXMark/).
-
-These are multiple queries over a three entities model representing on online auction system:
-
- - **Person** represents a person submitting an item for auction and/or making a bid
-    on an auction.
- - **Auction** represents an item under auction.
- - **Bid** represents a bid for an item under auction.
-
-The queries exercise many aspects of Beam model:
-
-* **Query1**: What are the bid values in Euro's?
-  Illustrates a simple map.
-* **Query2**: What are the auctions with particular auction numbers?
-  Illustrates a simple filter.
-* **Query3**: Who is selling in particular US states?
-  Illustrates an incremental join (using per-key state and timer) and filter.
-* **Query4**: What is the average selling price for each auction
-  category?
-  Illustrates complex join (using custom window functions) and
-  aggregation.
-* **Query5**: Which auctions have seen the most bids in the last period?
-  Illustrates sliding windows and combiners.
-* **Query6**: What is the average selling price per seller for their
-  last 10 closed auctions.
-  Shares the same 'winning bids' core as for **Query4**, and
-  illustrates a specialized combiner.
-* **Query7**: What are the highest bids per period?
-  Deliberately implemented using a side input to illustrate fanout.
-* **Query8**: Who has entered the system and created an auction in
-  the last period?
-  Illustrates a simple join.
-
-We have augmented the original queries with five more:
-
-* **Query0**: Pass-through.
-  Allows us to measure the monitoring overhead.
-* **Query9**: Winning-bids.
-  A common sub-query shared by **Query4** and **Query6**.
-* **Query10**: Log all events to GCS files.
-  Illustrates windows with large side effects on firing.
-* **Query11**: How many bids did a user make in each session they
-  were active?
-  Illustrates session windows.
-* **Query12**: How many bids does a user make within a fixed
-  processing time limit?
-  Illustrates working in processing time in the Global window, as
-  compared with event time in non-Global windows for all the other
-  queries.
-
-We can specify the Beam runner to use with maven profiles, available profiles are:
-
-* direct-runner
-* spark-runner
-* flink-runner
-* apex-runner
-
-The runner must also be specified like in any other Beam pipeline using
-
-    --runner
-
-
-Test data is deterministically synthesized on demand. The test
-data may be synthesized in the same pipeline as the query itself,
-or may be published to Pubsub.
-
-The query results may be:
-
-* Published to Pubsub.
-* Written to text files as plain text.
-* Written to text files using an Avro encoding.
-* Send to BigQuery.
-* Discarded.
-
-# Configuration
-
-## Common configuration parameters
-
-Decide if batch or streaming:
-
-    --streaming=true
-
-Number of events generators
-
-    --numEventGenerators=4
-
-Run query N
-
-    --query=N
-
-## Available Suites
-The suite to run can be chosen using this configuration parameter:
-
-    --suite=SUITE
-
-Available suites are:
-* DEFAULT: Test default configuration with query 0.
-* SMOKE: Run the 12 default configurations.
-* STRESS: Like smoke but for 1m events.
-* FULL_THROTTLE: Like SMOKE but 100m events.
-
-   
-
-## Apex specific configuration
-
-    --manageResources=false --monitorJobs=false
-
-## Dataflow specific configuration
-
-    --manageResources=false --monitorJobs=true \
-    --enforceEncodability=false --enforceImmutability=false
-    --project=<your project> \
-    --zone=<your zone> \
-    --workerMachineType=n1-highmem-8 \
-    --stagingLocation=<a gs path for staging> \
-    --runner=DataflowRunner \
-    --tempLocation=gs://talend-imejia/nexmark/temp/ \
-    --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \
-    --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar
-
-## Direct specific configuration
-
-    --manageResources=false --monitorJobs=true \
-    --enforceEncodability=false --enforceImmutability=false
-
-## Flink specific configuration
-
-    --manageResources=false --monitorJobs=true \
-    --flinkMaster=local --parallelism=#numcores
-
-## Spark specific configuration
-
-    --manageResources=false --monitorJobs=true \
-    --sparkMaster=local \
-    -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
-
-# Current Status
-
-Open issues are tracked [here](https://github.com../../../../../issues):
-
-## Batch / Synthetic / Local
-
-| Query | Direct | Spark                                                        | Flink                                                      | Apex                                                         |
-| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
-|     0 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|     1 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|     2 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|     3 | ok     | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | ok                                                         | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
-|     4 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|     5 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|     6 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|     7 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|     8 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|     9 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|    10 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|    11 | ok     | ok                                                           | ok                                                         | ok                                                           |
-|    12 | ok     | ok                                                           | ok                                                         | ok                                                           |
-
-## Streaming / Synthetic / Local
-
-| Query | Direct | Spark                                                        | Flink                                                      | Apex                                                         |
-| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
-|     0 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|     1 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|     2 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|     3 | ok     | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
-|     4 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|     5 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|     6 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|     7 | ok     | [BEAM-2112](https://issues.apache.org/jira/browse/BEAM-2112) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|     8 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|     9 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|    10 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|    11 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-|    12 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
-
-## Batch / Synthetic / Cluster
-
-TODO
-
-| Query | Dataflow                       | Spark                          | Flink                          | Apex                           |
-| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
-|     0 |                                |                                |                                |                                |
-
-## Streaming / Synthetic / Cluster
-
-TODO
-
-| Query | Dataflow                       | Spark                          | Flink                          | Apex                           |
-| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
-|     0 |                                |                                |                                |                                |
-
-# Running Nexmark
-
-## Running SMOKE suite on the DirectRunner (local)
-
-Batch Mode
-
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
-
-Streaming Mode
-
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
-
-
-## Running SMOKE suite on the SparkRunner (local)
-
-Batch Mode
-
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true"
-
-Streaming Mode
-
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true"
-
-
-## Running SMOKE suite on the FlinkRunner (local)
-
-Batch Mode
-
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true  --flinkMaster=local"
-
-Streaming Mode
-
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true  --flinkMaster=local"
-
-
-## Running SMOKE suite on the ApexRunner (local)
-
-Batch Mode
-
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false"
-
-Streaming Mode
-
-    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false"
-
-
-## Running SMOKE suite on Google Cloud Dataflow
-
-Building package
-
-    mvn clean package -Pdataflow-runner
-
-Submit to Google Dataflow service
-
-
-```
-java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
-  org.apache.beam.integration.nexmark.Main \
-  --runner=DataflowRunner
-  --project=<your project> \
-  --zone=<your zone> \
-  --workerMachineType=n1-highmem-8 \
-  --stagingLocation=<a gs path for staging> \
-  --streaming=true \
-  --sourceType=PUBSUB \
-  --pubSubMode=PUBLISH_ONLY \
-  --pubsubTopic=<an existing Pubsub topic> \
-  --resourceNameMode=VERBATIM \
-  --manageResources=false \
-  --monitorJobs=false \
-  --numEventGenerators=64 \
-  --numWorkers=16 \
-  --maxNumWorkers=16 \
-  --suite=SMOKE \
-  --firstEventRate=100000 \
-  --nextEventRate=100000 \
-  --ratePeriodSec=3600 \
-  --isRateLimited=true \
-  --avgPersonByteSize=500 \
-  --avgAuctionByteSize=500 \
-  --avgBidByteSize=500 \
-  --probDelayedEvent=0.000001 \
-  --occasionalDelaySec=3600 \
-  --numEvents=0 \
-  --useWallclockEventTime=true \
-  --usePubsubPublishTime=true \
-  --experiments=enable_custom_pubsub_sink
-```
-
-```
-java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
-  org.apache.beam.integration.nexmark.Main \
-  --runner=DataflowRunner
-  --project=<your project> \
-  --zone=<your zone> \
-  --workerMachineType=n1-highmem-8 \
-  --stagingLocation=<a gs path for staging> \
-  --streaming=true \
-  --sourceType=PUBSUB \
-  --pubSubMode=SUBSCRIBE_ONLY \
-  --pubsubSubscription=<an existing Pubsub subscription to above topic> \
-  --resourceNameMode=VERBATIM \
-  --manageResources=false \
-  --monitorJobs=false \
-  --numWorkers=64 \
-  --maxNumWorkers=64 \
-  --suite=SMOKE \
-  --usePubsubPublishTime=true \
-  --outputPath=<a gs path under which log files will be written> \
-  --windowSizeSec=600 \
-  --occasionalDelaySec=3600 \
-  --maxLogEvents=10000 \
-  --experiments=enable_custom_pubsub_source
-```
-
-## Running query 0 on a Spark cluster with yarn
-
-Building package
-
-    mvn clean package -Pspark-runner
-
-Submit to the cluster
-
-    spark-submit --master yarn-client --class org.apache.beam.integration.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true
-

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
deleted file mode 100644
index 664a410..0000000
--- a/integration/java/nexmark/pom.xml
+++ /dev/null
@@ -1,292 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.beam</groupId>
-    <artifactId>beam-integration-java-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>beam-integration-java-nexmark</artifactId>
-  <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name>
-  <packaging>jar</packaging>
-
-  <profiles>
-
-    <!--
-      The direct runner is available by default.
-      You can also include it on the classpath explicitly with -P direct-runner
-    -->
-    <profile>
-      <id>direct-runner</id>
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.beam</groupId>
-          <artifactId>beam-runners-direct-java</artifactId>
-          <scope>runtime</scope>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <!-- Include the Apache Apex runner with -P apex-runner -->
-    <profile>
-      <id>apex-runner</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.beam</groupId>
-          <artifactId>beam-runners-apex</artifactId>
-          <scope>runtime</scope>
-        </dependency>
-        <!--
-          Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
-          google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
-          can be removed when the project no longer has a dependency on a different httpclient version.
-        -->
-        <dependency>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpclient</artifactId>
-          <version>4.3.5</version>
-          <scope>runtime</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>commons-codec</groupId>
-              <artifactId>commons-codec</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <!-- Include the Apache Flink runner with -P flink-runner -->
-    <profile>
-      <id>flink-runner</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.beam</groupId>
-          <artifactId>beam-runners-flink_2.10</artifactId>
-          <scope>runtime</scope>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <!-- Include the Apache Spark runner -P spark-runner -->
-    <profile>
-      <id>spark-runner</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.beam</groupId>
-          <artifactId>beam-runners-spark</artifactId>
-          <scope>runtime</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-streaming_2.10</artifactId>
-          <version>${spark.version}</version>
-          <scope>runtime</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.spark</groupId>
-          <artifactId>spark-core_2.10</artifactId>
-          <version>${spark.version}</version>
-          <scope>runtime</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>org.slf4j</groupId>
-              <artifactId>jul-to-slf4j</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <!-- Include the Google Cloud Dataflow runner -P dataflow-runner -->
-    <profile>
-      <id>dataflow-runner</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.beam</groupId>
-          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
-          <scope>runtime</scope>
-        </dependency>
-      </dependencies>
-    </profile>
-  </profiles>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
-              <artifactSet>
-                <includes>
-                  <include>*:*</include>
-                </includes>
-              </artifactSet>
-              <filters>
-                <filter>
-                  <artifact>*:*</artifact>
-                  <excludes>
-                    <exclude>META-INF/*.SF</exclude>
-                    <exclude>META-INF/*.DSA</exclude>
-                    <exclude>META-INF/*.RSA</exclude>
-                  </excludes>
-                </filter>
-              </filters>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
-      <!-- Avro plugin for automatic code generation -->
-      <plugin>
-        <groupId>org.apache.avro</groupId>
-        <artifactId>avro-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>schemas</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>schema</goal>
-            </goals>
-            <configuration>
-              <sourceDirectory>${project.basedir}/src/main/</sourceDirectory>
-              <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
-      <!-- Coverage analysis for unit tests. -->
-      <plugin>
-        <groupId>org.jacoco</groupId>
-        <artifactId>jacoco-maven-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-    <!-- Java SDK -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-core</artifactId>
-    </dependency>
-
-    <!-- IOs -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-bigquery</artifactId>
-    </dependency>
-
-    <!-- Extra libraries -->
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>joda-time</groupId>
-      <artifactId>joda-time</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-jdk14</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>compile</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-core</artifactId>
-      <scope>compile</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-    </dependency>
-
-    <!-- Test -->
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-direct-java</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
deleted file mode 100644
index 4c23651..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * An implementation of the 'NEXMark queries' for Beam.
- * These are multiple queries over a three table schema representing an online auction system:
- * <ul>
- * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
- * on an auction.
- * <li>{@link Auction} represents an item under auction.
- * <li>{@link Bid} represents a bid for an item under auction.
- * </ul>
- * The queries exercise many aspects of the Beam model.
- *
- * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
- * particularly sensible.
- *
- * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
- * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
- */
-public class Main<OptionT extends NexmarkOptions> {
-
-  /**
-   * Entry point.
-   */
-  void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) {
-    Instant start = Instant.now();
-    Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
-    Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
-    Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
-
-    boolean successful = true;
-    try {
-      // Run all the configurations.
-      for (NexmarkConfiguration configuration : configurations) {
-        NexmarkPerf perf = nexmarkLauncher.run(configuration);
-        if (perf != null) {
-          if (perf.errors == null || perf.errors.size() > 0) {
-            successful = false;
-          }
-          appendPerf(options.getPerfFilename(), configuration, perf);
-          actual.put(configuration, perf);
-          // Summarize what we've run so far.
-          saveSummary(null, configurations, actual, baseline, start);
-        }
-      }
-    } finally {
-      if (options.getMonitorJobs()) {
-        // Report overall performance.
-        saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
-        saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
-      }
-    }
-
-    if (!successful) {
-      throw new RuntimeException("Execution was not successful");
-    }
-  }
-
-  /**
-   * Append the pair of {@code configuration} and {@code perf} to perf file.
-   */
-  private void appendPerf(
-      @Nullable String perfFilename, NexmarkConfiguration configuration,
-      NexmarkPerf perf) {
-    if (perfFilename == null) {
-      return;
-    }
-    List<String> lines = new ArrayList<>();
-    lines.add("");
-    lines.add(String.format("# %s", Instant.now()));
-    lines.add(String.format("# %s", configuration.toShortString()));
-    lines.add(configuration.toString());
-    lines.add(perf.toString());
-    try {
-      Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
-          StandardOpenOption.APPEND);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to write perf file: ", e);
-    }
-    NexmarkUtils.console("appended results to perf file %s.", perfFilename);
-  }
-
-  /**
-   * Load the baseline perf.
-   */
-  @Nullable
-  private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
-      @Nullable String baselineFilename) {
-    if (baselineFilename == null) {
-      return null;
-    }
-    Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
-    List<String> lines;
-    try {
-      lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to read baseline perf file: ", e);
-    }
-    for (int i = 0; i < lines.size(); i++) {
-      if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
-        continue;
-      }
-      NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
-      NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
-      baseline.put(configuration, perf);
-    }
-    NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
-        baselineFilename);
-    return baseline;
-  }
-
-  private static final String LINE =
-      "==========================================================================================";
-
-  /**
-   * Print summary  of {@code actual} vs (if non-null) {@code baseline}.
-   */
-  private static void saveSummary(
-      @Nullable String summaryFilename,
-      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
-      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
-    List<String> lines = new ArrayList<>();
-
-    lines.add("");
-    lines.add(LINE);
-
-    lines.add(
-        String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
-    lines.add("");
-
-    lines.add("Default configuration:");
-    lines.add(NexmarkConfiguration.DEFAULT.toString());
-    lines.add("");
-
-    lines.add("Configurations:");
-    lines.add("  Conf  Description");
-    int conf = 0;
-    for (NexmarkConfiguration configuration : configurations) {
-      lines.add(String.format("  %04d  %s", conf++, configuration.toShortString()));
-      NexmarkPerf actualPerf = actual.get(configuration);
-      if (actualPerf != null && actualPerf.jobId != null) {
-        lines.add(String.format("  %4s  [Ran as job %s]", "", actualPerf.jobId));
-      }
-    }
-
-    lines.add("");
-    lines.add("Performance:");
-    lines.add(String.format("  %4s  %12s  %12s  %12s  %12s  %12s  %12s", "Conf", "Runtime(sec)",
-        "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
-    conf = 0;
-    for (NexmarkConfiguration configuration : configurations) {
-      String line = String.format("  %04d  ", conf++);
-      NexmarkPerf actualPerf = actual.get(configuration);
-      if (actualPerf == null) {
-        line += "*** not run ***";
-      } else {
-        NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
-        double runtimeSec = actualPerf.runtimeSec;
-        line += String.format("%12.1f  ", runtimeSec);
-        if (baselinePerf == null) {
-          line += String.format("%12s  ", "");
-        } else {
-          double baselineRuntimeSec = baselinePerf.runtimeSec;
-          double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
-          line += String.format("%+11.2f%%  ", diff);
-        }
-
-        double eventsPerSec = actualPerf.eventsPerSec;
-        line += String.format("%12.1f  ", eventsPerSec);
-        if (baselinePerf == null) {
-          line += String.format("%12s  ", "");
-        } else {
-          double baselineEventsPerSec = baselinePerf.eventsPerSec;
-          double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
-          line += String.format("%+11.2f%%  ", diff);
-        }
-
-        long numResults = actualPerf.numResults;
-        line += String.format("%12d  ", numResults);
-        if (baselinePerf == null) {
-          line += String.format("%12s", "");
-        } else {
-          long baselineNumResults = baselinePerf.numResults;
-          long diff = numResults - baselineNumResults;
-          line += String.format("%+12d", diff);
-        }
-      }
-      lines.add(line);
-
-      if (actualPerf != null) {
-        List<String> errors = actualPerf.errors;
-        if (errors == null) {
-          errors = new ArrayList<>();
-          errors.add("NexmarkGoogleRunner returned null errors list");
-        }
-        for (String error : errors) {
-          lines.add(String.format("  %4s  *** %s ***", "", error));
-        }
-      }
-    }
-
-    lines.add(LINE);
-    lines.add("");
-
-    for (String line : lines) {
-      System.out.println(line);
-    }
-
-    if (summaryFilename != null) {
-      try {
-        Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
-            StandardOpenOption.CREATE, StandardOpenOption.APPEND);
-      } catch (IOException e) {
-        throw new RuntimeException("Unable to save summary file: ", e);
-      }
-      NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
-    }
-  }
-
-  /**
-   * Write all perf data and any baselines to a javascript file which can be used by
-   * graphing page etc.
-   */
-  private static void saveJavascript(
-      @Nullable String javascriptFilename,
-      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
-      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
-    if (javascriptFilename == null) {
-      return;
-    }
-
-    List<String> lines = new ArrayList<>();
-    lines.add(String.format(
-        "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
-    lines.add("var all = [");
-
-    for (NexmarkConfiguration configuration : configurations) {
-      lines.add("  {");
-      lines.add(String.format("    config: %s", configuration));
-      NexmarkPerf actualPerf = actual.get(configuration);
-      if (actualPerf != null) {
-        lines.add(String.format("    ,perf: %s", actualPerf));
-      }
-      NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
-      if (baselinePerf != null) {
-        lines.add(String.format("    ,baseline: %s", baselinePerf));
-      }
-      lines.add("  },");
-    }
-
-    lines.add("];");
-
-    try {
-      Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
-          StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to save javascript file: ", e);
-    }
-    NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
-  }
-
-  public static void main(String[] args) {
-    NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
-      .withValidation()
-      .as(NexmarkOptions.class);
-    NexmarkLauncher<NexmarkOptions> nexmarkLauncher = new NexmarkLauncher<>(options);
-    new Main<>().runAll(options, nexmarkLauncher);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
deleted file mode 100644
index 2f0c56a..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Distribution;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A monitor of elements with support for later retrieving their metrics.
- *
- * @param <T> Type of element we are monitoring.
- */
-public class Monitor<T extends KnownSize> implements Serializable {
-  private class MonitorDoFn extends DoFn<T, T> {
-    final Counter elementCounter =
-      Metrics.counter(name , prefix + ".elements");
-    final Counter bytesCounter =
-      Metrics.counter(name , prefix + ".bytes");
-    final Distribution startTime =
-      Metrics.distribution(name , prefix + ".startTime");
-    final Distribution endTime =
-      Metrics.distribution(name , prefix + ".endTime");
-    final Distribution startTimestamp =
-      Metrics.distribution(name , prefix + ".startTimestamp");
-    final Distribution endTimestamp =
-      Metrics.distribution(name , prefix + ".endTimestamp");
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      elementCounter.inc();
-      bytesCounter.inc(c.element().sizeInBytes());
-      long now = System.currentTimeMillis();
-      startTime.update(now);
-      endTime.update(now);
-      startTimestamp.update(c.timestamp().getMillis());
-      endTimestamp.update(c.timestamp().getMillis());
-      c.output(c.element());
-    }
-  }
-
-  public final String name;
-  public final String prefix;
-  private final MonitorDoFn doFn;
-  private final PTransform<PCollection<? extends T>, PCollection<T>> transform;
-
-  public Monitor(String name, String prefix) {
-    this.name = name;
-    this.prefix = prefix;
-    doFn = new MonitorDoFn();
-    transform = ParDo.of(doFn);
-  }
-
-  public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() {
-    return transform;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
deleted file mode 100644
index 2faf3f5..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ /dev/null
@@ -1,721 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * Configuration controlling how a query is run. May be supplied by command line or
- * programmatically. We only capture properties which may influence the resulting
- * pipeline performance, as captured by {@link NexmarkPerf}.
- */
-public class NexmarkConfiguration implements Serializable {
-  public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration();
-
-  /** If {@literal true}, include additional debugging and monitoring stats. */
-  @JsonProperty
-  public boolean debug = true;
-
-  /** Which query to run, in [0,9]. */
-  @JsonProperty
-  public int query = 0;
-
-  /** Where events come from. */
-  @JsonProperty
-  public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT;
-
-  /** Where results go to. */
-  @JsonProperty
-  public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL;
-
-  /**
-   * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated
-   * into the overall query pipeline.
-   */
-  @JsonProperty
-  public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED;
-
-  /**
-   * Number of events to generate. If zero, generate as many as possible without overflowing
-   * internal counters etc.
-   */
-  @JsonProperty
-  public long numEvents = 100000;
-
-  /**
-   * Number of event generators to use. Each generates events in its own timeline.
-   */
-  @JsonProperty
-  public int numEventGenerators = 100;
-
-  /**
-   * Shape of event rate curve.
-   */
-  @JsonProperty
-  public NexmarkUtils.RateShape rateShape = NexmarkUtils.RateShape.SINE;
-
-  /**
-   * Initial overall event rate (in {@link #rateUnit}).
-   */
-  @JsonProperty
-  public int firstEventRate = 10000;
-
-  /**
-   * Next overall event rate (in {@link #rateUnit}).
-   */
-  @JsonProperty
-  public int nextEventRate = 10000;
-
-  /**
-   * Unit for rates.
-   */
-  @JsonProperty
-  public NexmarkUtils.RateUnit rateUnit = NexmarkUtils.RateUnit.PER_SECOND;
-
-  /**
-   * Overall period of rate shape, in seconds.
-   */
-  @JsonProperty
-  public int ratePeriodSec = 600;
-
-  /**
-   * Time in seconds to preload the subscription with data, at the initial input rate of the
-   * pipeline.
-   */
-  @JsonProperty
-  public int preloadSeconds = 0;
-
-  /**
-   * Timeout for stream pipelines to stop in seconds.
-   */
-  @JsonProperty
-  public int streamTimeout = 240;
-
-  /**
-   * If true, and in streaming mode, generate events only when they are due according to their
-   * timestamp.
-   */
-  @JsonProperty
-  public boolean isRateLimited = false;
-
-  /**
-   * If true, use wallclock time as event time. Otherwise, use a deterministic
-   * time in the past so that multiple runs will see exactly the same event streams
-   * and should thus have exactly the same results.
-   */
-  @JsonProperty
-  public boolean useWallclockEventTime = false;
-
-  /** Average idealized size of a 'new person' event, in bytes. */
-  @JsonProperty
-  public int avgPersonByteSize = 200;
-
-  /** Average idealized size of a 'new auction' event, in bytes. */
-  @JsonProperty
-  public int avgAuctionByteSize = 500;
-
-  /** Average idealized size of a 'bid' event, in bytes. */
-  @JsonProperty
-  public int avgBidByteSize = 100;
-
-  /** Ratio of bids to 'hot' auctions compared to all other auctions. */
-  @JsonProperty
-  public int hotAuctionRatio = 2;
-
-  /** Ratio of auctions for 'hot' sellers compared to all other people. */
-  @JsonProperty
-  public int hotSellersRatio = 4;
-
-  /** Ratio of bids for 'hot' bidders compared to all other people. */
-  @JsonProperty
-  public int hotBiddersRatio = 4;
-
-  /** Window size, in seconds, for queries 3, 5, 7 and 8. */
-  @JsonProperty
-  public long windowSizeSec = 10;
-
-  /** Sliding window period, in seconds, for query 5. */
-  @JsonProperty
-  public long windowPeriodSec = 5;
-
-  /** Number of seconds to hold back events according to their reported timestamp. */
-  @JsonProperty
-  public long watermarkHoldbackSec = 0;
-
-  /** Average number of auction which should be inflight at any time, per generator. */
-  @JsonProperty
-  public int numInFlightAuctions = 100;
-
-  /** Maximum number of people to consider as active for placing auctions or bids. */
-  @JsonProperty
-  public int numActivePeople = 1000;
-
-  /** Coder strategy to follow. */
-  @JsonProperty
-  public NexmarkUtils.CoderStrategy coderStrategy = NexmarkUtils.CoderStrategy.HAND;
-
-  /**
-   * Delay, in milliseconds, for each event. This will peg one core for this number
-   * of milliseconds to simulate CPU-bound computation.
-   */
-  @JsonProperty
-  public long cpuDelayMs = 0;
-
-  /**
-   * Extra data, in bytes, to save to persistent state for each event. This will force
-   * i/o all the way to durable storage to simulate an I/O-bound computation.
-   */
-  @JsonProperty
-  public long diskBusyBytes = 0;
-
-  /**
-   * Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction.
-   */
-  @JsonProperty
-  public int auctionSkip = 123;
-
-  /**
-   * Fanout for queries 4 (groups by category id), 5 and 7 (find a global maximum).
-   */
-  @JsonProperty
-  public int fanout = 5;
-
-  /**
-   * Maximum waiting time to clean personState in query3
-   * (ie maximum waiting of the auctions related to person in state in seconds in event time).
-   */
-  @JsonProperty
-  public int maxAuctionsWaitingTime = 600;
-
-  /**
-   * Length of occasional delay to impose on events (in seconds).
-   */
-  @JsonProperty
-  public long occasionalDelaySec = 3;
-
-  /**
-   * Probability that an event will be delayed by delayS.
-   */
-  @JsonProperty
-  public double probDelayedEvent = 0.1;
-
-  /**
-   * Maximum size of each log file (in events). For Query10 only.
-   */
-  @JsonProperty
-  public int maxLogEvents = 100_000;
-
-  /**
-   * If true, use pub/sub publish time instead of event time.
-   */
-  @JsonProperty
-  public boolean usePubsubPublishTime = false;
-
-  /**
-   * Number of events in out-of-order groups. 1 implies no out-of-order events. 1000 implies
-   * every 1000 events per generator are emitted in pseudo-random order.
-   */
-  @JsonProperty
-  public long outOfOrderGroupSize = 1;
-
-  /**
-   * Replace any properties of this configuration which have been supplied by the command line.
-   */
-  public void overrideFromOptions(NexmarkOptions options) {
-    if (options.getDebug() != null) {
-      debug = options.getDebug();
-    }
-    if (options.getQuery() != null) {
-      query = options.getQuery();
-    }
-    if (options.getSourceType() != null) {
-      sourceType = options.getSourceType();
-    }
-    if (options.getSinkType() != null) {
-      sinkType = options.getSinkType();
-    }
-    if (options.getPubSubMode() != null) {
-      pubSubMode = options.getPubSubMode();
-    }
-    if (options.getNumEvents() != null) {
-      numEvents = options.getNumEvents();
-    }
-    if (options.getNumEventGenerators() != null) {
-      numEventGenerators = options.getNumEventGenerators();
-    }
-    if (options.getRateShape() != null) {
-      rateShape = options.getRateShape();
-    }
-    if (options.getFirstEventRate() != null) {
-      firstEventRate = options.getFirstEventRate();
-    }
-    if (options.getNextEventRate() != null) {
-      nextEventRate = options.getNextEventRate();
-    }
-    if (options.getRateUnit() != null) {
-      rateUnit = options.getRateUnit();
-    }
-    if (options.getRatePeriodSec() != null) {
-      ratePeriodSec = options.getRatePeriodSec();
-    }
-    if (options.getPreloadSeconds() != null) {
-      preloadSeconds = options.getPreloadSeconds();
-    }
-    if (options.getStreamTimeout() != null) {
-      streamTimeout = options.getStreamTimeout();
-    }
-    if (options.getIsRateLimited() != null) {
-      isRateLimited = options.getIsRateLimited();
-    }
-    if (options.getUseWallclockEventTime() != null) {
-      useWallclockEventTime = options.getUseWallclockEventTime();
-    }
-    if (options.getAvgPersonByteSize() != null) {
-      avgPersonByteSize = options.getAvgPersonByteSize();
-    }
-    if (options.getAvgAuctionByteSize() != null) {
-      avgAuctionByteSize = options.getAvgAuctionByteSize();
-    }
-    if (options.getAvgBidByteSize() != null) {
-      avgBidByteSize = options.getAvgBidByteSize();
-    }
-    if (options.getHotAuctionRatio() != null) {
-      hotAuctionRatio = options.getHotAuctionRatio();
-    }
-    if (options.getHotSellersRatio() != null) {
-      hotSellersRatio = options.getHotSellersRatio();
-    }
-    if (options.getHotBiddersRatio() != null) {
-      hotBiddersRatio = options.getHotBiddersRatio();
-    }
-    if (options.getWindowSizeSec() != null) {
-      windowSizeSec = options.getWindowSizeSec();
-    }
-    if (options.getWindowPeriodSec() != null) {
-      windowPeriodSec = options.getWindowPeriodSec();
-    }
-    if (options.getWatermarkHoldbackSec() != null) {
-      watermarkHoldbackSec = options.getWatermarkHoldbackSec();
-    }
-    if (options.getNumInFlightAuctions() != null) {
-      numInFlightAuctions = options.getNumInFlightAuctions();
-    }
-    if (options.getNumActivePeople() != null) {
-      numActivePeople = options.getNumActivePeople();
-    }
-    if (options.getCoderStrategy() != null) {
-      coderStrategy = options.getCoderStrategy();
-    }
-    if (options.getCpuDelayMs() != null) {
-      cpuDelayMs = options.getCpuDelayMs();
-    }
-    if (options.getDiskBusyBytes() != null) {
-      diskBusyBytes = options.getDiskBusyBytes();
-    }
-    if (options.getAuctionSkip() != null) {
-      auctionSkip = options.getAuctionSkip();
-    }
-    if (options.getFanout() != null) {
-      fanout = options.getFanout();
-    }
-    if (options.getMaxAuctionsWaitingTime() != null) {
-      fanout = options.getMaxAuctionsWaitingTime();
-    }
-    if (options.getOccasionalDelaySec() != null) {
-      occasionalDelaySec = options.getOccasionalDelaySec();
-    }
-    if (options.getProbDelayedEvent() != null) {
-      probDelayedEvent = options.getProbDelayedEvent();
-    }
-    if (options.getMaxLogEvents() != null) {
-      maxLogEvents = options.getMaxLogEvents();
-    }
-    if (options.getUsePubsubPublishTime() != null) {
-      usePubsubPublishTime = options.getUsePubsubPublishTime();
-    }
-    if (options.getOutOfOrderGroupSize() != null) {
-      outOfOrderGroupSize = options.getOutOfOrderGroupSize();
-    }
-  }
-
-  /**
-   * Return copy of configuration with given label.
-   */
-  public NexmarkConfiguration copy() {
-    NexmarkConfiguration result;
-    result = new NexmarkConfiguration();
-    result.debug = debug;
-    result.query = query;
-    result.sourceType = sourceType;
-    result.sinkType = sinkType;
-    result.pubSubMode = pubSubMode;
-    result.numEvents = numEvents;
-    result.numEventGenerators = numEventGenerators;
-    result.rateShape = rateShape;
-    result.firstEventRate = firstEventRate;
-    result.nextEventRate = nextEventRate;
-    result.rateUnit = rateUnit;
-    result.ratePeriodSec = ratePeriodSec;
-    result.preloadSeconds = preloadSeconds;
-    result.streamTimeout = streamTimeout;
-    result.isRateLimited = isRateLimited;
-    result.useWallclockEventTime = useWallclockEventTime;
-    result.avgPersonByteSize = avgPersonByteSize;
-    result.avgAuctionByteSize = avgAuctionByteSize;
-    result.avgBidByteSize = avgBidByteSize;
-    result.hotAuctionRatio = hotAuctionRatio;
-    result.hotSellersRatio = hotSellersRatio;
-    result.hotBiddersRatio = hotBiddersRatio;
-    result.windowSizeSec = windowSizeSec;
-    result.windowPeriodSec = windowPeriodSec;
-    result.watermarkHoldbackSec = watermarkHoldbackSec;
-    result.numInFlightAuctions = numInFlightAuctions;
-    result.numActivePeople = numActivePeople;
-    result.coderStrategy = coderStrategy;
-    result.cpuDelayMs = cpuDelayMs;
-    result.diskBusyBytes = diskBusyBytes;
-    result.auctionSkip = auctionSkip;
-    result.fanout = fanout;
-    result.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
-    result.occasionalDelaySec = occasionalDelaySec;
-    result.probDelayedEvent = probDelayedEvent;
-    result.maxLogEvents = maxLogEvents;
-    result.usePubsubPublishTime = usePubsubPublishTime;
-    result.outOfOrderGroupSize = outOfOrderGroupSize;
-    return result;
-  }
-
-  /**
-   * Return short description of configuration (suitable for use in logging). We only render
-   * the core fields plus those which do not have default values.
-   */
-  public String toShortString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(String.format("query:%d", query));
-    if (debug != DEFAULT.debug) {
-      sb.append(String.format("; debug:%s", debug));
-    }
-    if (sourceType != DEFAULT.sourceType) {
-      sb.append(String.format("; sourceType:%s", sourceType));
-    }
-    if (sinkType != DEFAULT.sinkType) {
-      sb.append(String.format("; sinkType:%s", sinkType));
-    }
-    if (pubSubMode != DEFAULT.pubSubMode) {
-      sb.append(String.format("; pubSubMode:%s", pubSubMode));
-    }
-    if (numEvents != DEFAULT.numEvents) {
-      sb.append(String.format("; numEvents:%d", numEvents));
-    }
-    if (numEventGenerators != DEFAULT.numEventGenerators) {
-      sb.append(String.format("; numEventGenerators:%d", numEventGenerators));
-    }
-    if (rateShape != DEFAULT.rateShape) {
-      sb.append(String.format("; rateShape:%s", rateShape));
-    }
-    if (firstEventRate != DEFAULT.firstEventRate || nextEventRate != DEFAULT.nextEventRate) {
-      sb.append(String.format("; firstEventRate:%d", firstEventRate));
-      sb.append(String.format("; nextEventRate:%d", nextEventRate));
-    }
-    if (rateUnit != DEFAULT.rateUnit) {
-      sb.append(String.format("; rateUnit:%s", rateUnit));
-    }
-    if (ratePeriodSec != DEFAULT.ratePeriodSec) {
-      sb.append(String.format("; ratePeriodSec:%d", ratePeriodSec));
-    }
-    if (preloadSeconds != DEFAULT.preloadSeconds) {
-      sb.append(String.format("; preloadSeconds:%d", preloadSeconds));
-    }
-    if (streamTimeout != DEFAULT.streamTimeout) {
-      sb.append(String.format("; streamTimeout:%d", streamTimeout));
-    }
-    if (isRateLimited != DEFAULT.isRateLimited) {
-      sb.append(String.format("; isRateLimited:%s", isRateLimited));
-    }
-    if (useWallclockEventTime != DEFAULT.useWallclockEventTime) {
-      sb.append(String.format("; useWallclockEventTime:%s", useWallclockEventTime));
-    }
-    if (avgPersonByteSize != DEFAULT.avgPersonByteSize) {
-      sb.append(String.format("; avgPersonByteSize:%d", avgPersonByteSize));
-    }
-    if (avgAuctionByteSize != DEFAULT.avgAuctionByteSize) {
-      sb.append(String.format("; avgAuctionByteSize:%d", avgAuctionByteSize));
-    }
-    if (avgBidByteSize != DEFAULT.avgBidByteSize) {
-      sb.append(String.format("; avgBidByteSize:%d", avgBidByteSize));
-    }
-    if (hotAuctionRatio != DEFAULT.hotAuctionRatio) {
-      sb.append(String.format("; hotAuctionRatio:%d", hotAuctionRatio));
-    }
-    if (hotSellersRatio != DEFAULT.hotSellersRatio) {
-      sb.append(String.format("; hotSellersRatio:%d", hotSellersRatio));
-    }
-    if (hotBiddersRatio != DEFAULT.hotBiddersRatio) {
-      sb.append(String.format("; hotBiddersRatio:%d", hotBiddersRatio));
-    }
-    if (windowSizeSec != DEFAULT.windowSizeSec) {
-      sb.append(String.format("; windowSizeSec:%d", windowSizeSec));
-    }
-    if (windowPeriodSec != DEFAULT.windowPeriodSec) {
-      sb.append(String.format("; windowPeriodSec:%d", windowPeriodSec));
-    }
-    if (watermarkHoldbackSec != DEFAULT.watermarkHoldbackSec) {
-      sb.append(String.format("; watermarkHoldbackSec:%d", watermarkHoldbackSec));
-    }
-    if (numInFlightAuctions != DEFAULT.numInFlightAuctions) {
-      sb.append(String.format("; numInFlightAuctions:%d", numInFlightAuctions));
-    }
-    if (numActivePeople != DEFAULT.numActivePeople) {
-      sb.append(String.format("; numActivePeople:%d", numActivePeople));
-    }
-    if (coderStrategy != DEFAULT.coderStrategy) {
-      sb.append(String.format("; coderStrategy:%s", coderStrategy));
-    }
-    if (cpuDelayMs != DEFAULT.cpuDelayMs) {
-      sb.append(String.format("; cpuSlowdownMs:%d", cpuDelayMs));
-    }
-    if (diskBusyBytes != DEFAULT.diskBusyBytes) {
-      sb.append(String.format("; diskBuysBytes:%d", diskBusyBytes));
-    }
-    if (auctionSkip != DEFAULT.auctionSkip) {
-      sb.append(String.format("; auctionSkip:%d", auctionSkip));
-    }
-    if (fanout != DEFAULT.fanout) {
-      sb.append(String.format("; fanout:%d", fanout));
-    }
-    if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) {
-      sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout));
-    }
-    if (occasionalDelaySec != DEFAULT.occasionalDelaySec) {
-      sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec));
-    }
-    if (probDelayedEvent != DEFAULT.probDelayedEvent) {
-      sb.append(String.format("; probDelayedEvent:%f", probDelayedEvent));
-    }
-    if (maxLogEvents != DEFAULT.maxLogEvents) {
-      sb.append(String.format("; maxLogEvents:%d", maxLogEvents));
-    }
-    if (usePubsubPublishTime != DEFAULT.usePubsubPublishTime) {
-      sb.append(String.format("; usePubsubPublishTime:%s", usePubsubPublishTime));
-    }
-    if (outOfOrderGroupSize != DEFAULT.outOfOrderGroupSize) {
-      sb.append(String.format("; outOfOrderGroupSize:%d", outOfOrderGroupSize));
-    }
-    return sb.toString();
-  }
-
-  /**
-   * Return full description as a string.
-   */
-  @Override
-  public String toString() {
-    try {
-      return NexmarkUtils.MAPPER.writeValueAsString(this);
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Parse an object from {@code string}.
-   */
-  public static NexmarkConfiguration fromString(String string) {
-    try {
-      return NexmarkUtils.MAPPER.readValue(string, NexmarkConfiguration.class);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to parse nexmark configuration: ", e);
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(
-        debug,
-        query,
-        sourceType,
-        sinkType,
-        pubSubMode,
-        numEvents,
-        numEventGenerators,
-        rateShape,
-        firstEventRate,
-        nextEventRate,
-        rateUnit,
-        ratePeriodSec,
-        preloadSeconds,
-        streamTimeout,
-        isRateLimited,
-        useWallclockEventTime,
-        avgPersonByteSize,
-        avgAuctionByteSize,
-        avgBidByteSize,
-        hotAuctionRatio,
-        hotSellersRatio,
-        hotBiddersRatio,
-        windowSizeSec,
-        windowPeriodSec,
-        watermarkHoldbackSec,
-        numInFlightAuctions,
-        numActivePeople,
-        coderStrategy,
-        cpuDelayMs,
-        diskBusyBytes,
-        auctionSkip,
-        fanout,
-        maxAuctionsWaitingTime,
-        occasionalDelaySec,
-        probDelayedEvent,
-        maxLogEvents,
-        usePubsubPublishTime,
-        outOfOrderGroupSize);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    NexmarkConfiguration other = (NexmarkConfiguration) obj;
-    if (debug != other.debug) {
-      return false;
-    }
-    if (auctionSkip != other.auctionSkip) {
-      return false;
-    }
-    if (avgAuctionByteSize != other.avgAuctionByteSize) {
-      return false;
-    }
-    if (avgBidByteSize != other.avgBidByteSize) {
-      return false;
-    }
-    if (avgPersonByteSize != other.avgPersonByteSize) {
-      return false;
-    }
-    if (coderStrategy != other.coderStrategy) {
-      return false;
-    }
-    if (cpuDelayMs != other.cpuDelayMs) {
-      return false;
-    }
-    if (diskBusyBytes != other.diskBusyBytes) {
-      return false;
-    }
-    if (fanout != other.fanout) {
-      return false;
-    }
-    if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) {
-      return false;
-    }
-    if (firstEventRate != other.firstEventRate) {
-      return false;
-    }
-    if (hotAuctionRatio != other.hotAuctionRatio) {
-      return false;
-    }
-    if (hotBiddersRatio != other.hotBiddersRatio) {
-      return false;
-    }
-    if (hotSellersRatio != other.hotSellersRatio) {
-      return false;
-    }
-    if (isRateLimited != other.isRateLimited) {
-      return false;
-    }
-    if (maxLogEvents != other.maxLogEvents) {
-      return false;
-    }
-    if (nextEventRate != other.nextEventRate) {
-      return false;
-    }
-    if (rateUnit != other.rateUnit) {
-      return false;
-    }
-    if (numEventGenerators != other.numEventGenerators) {
-      return false;
-    }
-    if (numEvents != other.numEvents) {
-      return false;
-    }
-    if (numInFlightAuctions != other.numInFlightAuctions) {
-      return false;
-    }
-    if (numActivePeople != other.numActivePeople) {
-      return false;
-    }
-    if (occasionalDelaySec != other.occasionalDelaySec) {
-      return false;
-    }
-    if (preloadSeconds != other.preloadSeconds) {
-      return false;
-    }
-    if (streamTimeout != other.streamTimeout) {
-      return false;
-    }
-    if (Double.doubleToLongBits(probDelayedEvent)
-        != Double.doubleToLongBits(other.probDelayedEvent)) {
-      return false;
-    }
-    if (pubSubMode != other.pubSubMode) {
-      return false;
-    }
-    if (ratePeriodSec != other.ratePeriodSec) {
-      return false;
-    }
-    if (rateShape != other.rateShape) {
-      return false;
-    }
-    if (query != other.query) {
-      return false;
-    }
-    if (sinkType != other.sinkType) {
-      return false;
-    }
-    if (sourceType != other.sourceType) {
-      return false;
-    }
-    if (useWallclockEventTime != other.useWallclockEventTime) {
-      return false;
-    }
-    if (watermarkHoldbackSec != other.watermarkHoldbackSec) {
-      return false;
-    }
-    if (windowPeriodSec != other.windowPeriodSec) {
-      return false;
-    }
-    if (windowSizeSec != other.windowSizeSec) {
-      return false;
-    }
-    if (usePubsubPublishTime != other.usePubsubPublishTime) {
-      return false;
-    }
-    if (outOfOrderGroupSize != other.outOfOrderGroupSize) {
-      return false;
-    }
-    return true;
-  }
-}