You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:54 UTC

[46/55] [abbrv] beam git commit: Move module beam-integration-java-nexmark to beam-sdks-java-nexmark

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/pom.xml b/integration/java/pom.xml
deleted file mode 100644
index b0c3853..0000000
--- a/integration/java/pom.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.beam</groupId>
-    <artifactId>beam-integration-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>beam-integration-java-parent</artifactId>
-  <packaging>pom</packaging>
-  <name>Apache Beam :: Integration Tests :: Java</name>
-
-  <modules>
-    <module>nexmark</module>
-  </modules>
-
-</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
deleted file mode 100644
index 4254819..0000000
--- a/integration/pom.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.beam</groupId>
-    <artifactId>beam-parent</artifactId>
-    <version>2.1.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>beam-integration-parent</artifactId>
-  <packaging>pom</packaging>
-  <name>Apache Beam :: Integration Tests</name>
-
-  <profiles>
-    <profile>
-      <id>release</id>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>findbugs-maven-plugin</artifactId>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-
-  <modules>
-    <module>java</module>
-  </modules>
-
-</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bddbf1f..5fd1297 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,7 +187,6 @@
     <module>sdks</module>
     <module>runners</module>
     <module>examples</module>
-    <module>integration</module>
     <!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. -->
     <module>sdks/java/javadoc</module>
   </modules>

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/README.md b/sdks/java/nexmark/README.md
new file mode 100644
index 0000000..a9acd63
--- /dev/null
+++ b/sdks/java/nexmark/README.md
@@ -0,0 +1,340 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# NEXMark integration suite
+
+This is a suite of pipelines inspired by the 'continuous data stream'
+queries in [http://datalab.cs.pdx.edu/niagaraST/NEXMark/]
+(http://datalab.cs.pdx.edu/niagaraST/NEXMark/).
+
+These are multiple queries over a three entities model representing on online auction system:
+
+ - **Person** represents a person submitting an item for auction and/or making a bid
+    on an auction.
+ - **Auction** represents an item under auction.
+ - **Bid** represents a bid for an item under auction.
+
+The queries exercise many aspects of Beam model:
+
+* **Query1**: What are the bid values in Euro's?
+  Illustrates a simple map.
+* **Query2**: What are the auctions with particular auction numbers?
+  Illustrates a simple filter.
+* **Query3**: Who is selling in particular US states?
+  Illustrates an incremental join (using per-key state and timer) and filter.
+* **Query4**: What is the average selling price for each auction
+  category?
+  Illustrates complex join (using custom window functions) and
+  aggregation.
+* **Query5**: Which auctions have seen the most bids in the last period?
+  Illustrates sliding windows and combiners.
+* **Query6**: What is the average selling price per seller for their
+  last 10 closed auctions.
+  Shares the same 'winning bids' core as for **Query4**, and
+  illustrates a specialized combiner.
+* **Query7**: What are the highest bids per period?
+  Deliberately implemented using a side input to illustrate fanout.
+* **Query8**: Who has entered the system and created an auction in
+  the last period?
+  Illustrates a simple join.
+
+We have augmented the original queries with five more:
+
+* **Query0**: Pass-through.
+  Allows us to measure the monitoring overhead.
+* **Query9**: Winning-bids.
+  A common sub-query shared by **Query4** and **Query6**.
+* **Query10**: Log all events to GCS files.
+  Illustrates windows with large side effects on firing.
+* **Query11**: How many bids did a user make in each session they
+  were active?
+  Illustrates session windows.
+* **Query12**: How many bids does a user make within a fixed
+  processing time limit?
+  Illustrates working in processing time in the Global window, as
+  compared with event time in non-Global windows for all the other
+  queries.
+
+We can specify the Beam runner to use with maven profiles, available profiles are:
+
+* direct-runner
+* spark-runner
+* flink-runner
+* apex-runner
+
+The runner must also be specified like in any other Beam pipeline using
+
+    --runner
+
+
+Test data is deterministically synthesized on demand. The test
+data may be synthesized in the same pipeline as the query itself,
+or may be published to Pubsub.
+
+The query results may be:
+
+* Published to Pubsub.
+* Written to text files as plain text.
+* Written to text files using an Avro encoding.
+* Send to BigQuery.
+* Discarded.
+
+# Configuration
+
+## Common configuration parameters
+
+Decide if batch or streaming:
+
+    --streaming=true
+
+Number of events generators
+
+    --numEventGenerators=4
+
+Run query N
+
+    --query=N
+
+## Available Suites
+The suite to run can be chosen using this configuration parameter:
+
+    --suite=SUITE
+
+Available suites are:
+* DEFAULT: Test default configuration with query 0.
+* SMOKE: Run the 12 default configurations.
+* STRESS: Like smoke but for 1m events.
+* FULL_THROTTLE: Like SMOKE but 100m events.
+
+   
+
+## Apex specific configuration
+
+    --manageResources=false --monitorJobs=false
+
+## Dataflow specific configuration
+
+    --manageResources=false --monitorJobs=true \
+    --enforceEncodability=false --enforceImmutability=false
+    --project=<your project> \
+    --zone=<your zone> \
+    --workerMachineType=n1-highmem-8 \
+    --stagingLocation=<a gs path for staging> \
+    --runner=DataflowRunner \
+    --tempLocation=gs://talend-imejia/nexmark/temp/ \
+    --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \
+    --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar
+
+## Direct specific configuration
+
+    --manageResources=false --monitorJobs=true \
+    --enforceEncodability=false --enforceImmutability=false
+
+## Flink specific configuration
+
+    --manageResources=false --monitorJobs=true \
+    --flinkMaster=local --parallelism=#numcores
+
+## Spark specific configuration
+
+    --manageResources=false --monitorJobs=true \
+    --sparkMaster=local \
+    -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
+
+# Current Status
+
+Open issues are tracked [here](https://github.com../../../../../issues):
+
+## Batch / Synthetic / Local
+
+| Query | Direct | Spark                                                        | Flink                                                      | Apex                                                         |
+| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
+|     0 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|     1 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|     2 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|     3 | ok     | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | ok                                                         | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
+|     4 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|     5 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|     6 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|     7 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|     8 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|     9 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|    10 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|    11 | ok     | ok                                                           | ok                                                         | ok                                                           |
+|    12 | ok     | ok                                                           | ok                                                         | ok                                                           |
+
+## Streaming / Synthetic / Local
+
+| Query | Direct | Spark                                                        | Flink                                                      | Apex                                                         |
+| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
+|     0 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|     1 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|     2 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|     3 | ok     | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
+|     4 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|     5 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|     6 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|     7 | ok     | [BEAM-2112](https://issues.apache.org/jira/browse/BEAM-2112) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|     8 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|     9 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|    10 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|    11 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+|    12 | ok     | ok                                                           | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok                                                           |
+
+## Batch / Synthetic / Cluster
+
+TODO
+
+| Query | Dataflow                       | Spark                          | Flink                          | Apex                           |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+|     0 |                                |                                |                                |                                |
+
+## Streaming / Synthetic / Cluster
+
+TODO
+
+| Query | Dataflow                       | Spark                          | Flink                          | Apex                           |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+|     0 |                                |                                |                                |                                |
+
+# Running Nexmark
+
+## Running SMOKE suite on the DirectRunner (local)
+
+Batch Mode
+
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
+
+Streaming Mode
+
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
+
+
+## Running SMOKE suite on the SparkRunner (local)
+
+Batch Mode
+
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true"
+
+Streaming Mode
+
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true"
+
+
+## Running SMOKE suite on the FlinkRunner (local)
+
+Batch Mode
+
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true  --flinkMaster=local"
+
+Streaming Mode
+
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true  --flinkMaster=local"
+
+
+## Running SMOKE suite on the ApexRunner (local)
+
+Batch Mode
+
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false"
+
+Streaming Mode
+
+    mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false"
+
+
+## Running SMOKE suite on Google Cloud Dataflow
+
+Building package
+
+    mvn clean package -Pdataflow-runner
+
+Submit to Google Dataflow service
+
+
+```
+java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
+  org.apache.beam.integration.nexmark.Main \
+  --runner=DataflowRunner
+  --project=<your project> \
+  --zone=<your zone> \
+  --workerMachineType=n1-highmem-8 \
+  --stagingLocation=<a gs path for staging> \
+  --streaming=true \
+  --sourceType=PUBSUB \
+  --pubSubMode=PUBLISH_ONLY \
+  --pubsubTopic=<an existing Pubsub topic> \
+  --resourceNameMode=VERBATIM \
+  --manageResources=false \
+  --monitorJobs=false \
+  --numEventGenerators=64 \
+  --numWorkers=16 \
+  --maxNumWorkers=16 \
+  --suite=SMOKE \
+  --firstEventRate=100000 \
+  --nextEventRate=100000 \
+  --ratePeriodSec=3600 \
+  --isRateLimited=true \
+  --avgPersonByteSize=500 \
+  --avgAuctionByteSize=500 \
+  --avgBidByteSize=500 \
+  --probDelayedEvent=0.000001 \
+  --occasionalDelaySec=3600 \
+  --numEvents=0 \
+  --useWallclockEventTime=true \
+  --usePubsubPublishTime=true \
+  --experiments=enable_custom_pubsub_sink
+```
+
+```
+java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
+  org.apache.beam.integration.nexmark.Main \
+  --runner=DataflowRunner
+  --project=<your project> \
+  --zone=<your zone> \
+  --workerMachineType=n1-highmem-8 \
+  --stagingLocation=<a gs path for staging> \
+  --streaming=true \
+  --sourceType=PUBSUB \
+  --pubSubMode=SUBSCRIBE_ONLY \
+  --pubsubSubscription=<an existing Pubsub subscription to above topic> \
+  --resourceNameMode=VERBATIM \
+  --manageResources=false \
+  --monitorJobs=false \
+  --numWorkers=64 \
+  --maxNumWorkers=64 \
+  --suite=SMOKE \
+  --usePubsubPublishTime=true \
+  --outputPath=<a gs path under which log files will be written> \
+  --windowSizeSec=600 \
+  --occasionalDelaySec=3600 \
+  --maxLogEvents=10000 \
+  --experiments=enable_custom_pubsub_source
+```
+
+## Running query 0 on a Spark cluster with yarn
+
+Building package
+
+    mvn clean package -Pspark-runner
+
+Submit to the cluster
+
+    spark-submit --master yarn-client --class org.apache.beam.integration.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true
+

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml
new file mode 100644
index 0000000..c1b6025
--- /dev/null
+++ b/sdks/java/nexmark/pom.xml
@@ -0,0 +1,292 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-parent</artifactId>
+    <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-nexmark</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Nexmark</name>
+  <packaging>jar</packaging>
+
+  <profiles>
+
+    <!--
+      The direct runner is available by default.
+      You can also include it on the classpath explicitly with -P direct-runner
+    -->
+    <profile>
+      <id>direct-runner</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-direct-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Include the Apache Apex runner with -P apex-runner -->
+    <profile>
+      <id>apex-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-apex</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+        <!--
+          Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
+          google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
+          can be removed when the project no longer has a dependency on a different httpclient version.
+        -->
+        <dependency>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpclient</artifactId>
+          <version>4.3.5</version>
+          <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>commons-codec</groupId>
+              <artifactId>commons-codec</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Include the Apache Flink runner with -P flink-runner -->
+    <profile>
+      <id>flink-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-flink_2.10</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Include the Apache Spark runner -P spark-runner -->
+    <profile>
+      <id>spark-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-spark</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
+          <scope>runtime</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-core_2.10</artifactId>
+          <version>${spark.version}</version>
+          <scope>runtime</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.slf4j</groupId>
+              <artifactId>jul-to-slf4j</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Include the Google Cloud Dataflow runner -P dataflow-runner -->
+    <profile>
+      <id>dataflow-runner</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
+              <artifactSet>
+                <includes>
+                  <include>*:*</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Avro plugin for automatic code generation -->
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>schemas</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <sourceDirectory>${project.basedir}/src/main/</sourceDirectory>
+              <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- Java SDK -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <!-- IOs -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-bigquery</artifactId>
+    </dependency>
+
+    <!-- Extra libraries -->
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+    </dependency>
+
+    <!-- Test -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
new file mode 100644
index 0000000..ab2284c
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of the 'NEXMark queries' for Beam.
+ * These are multiple queries over a three table schema representing an online auction system:
+ * <ul>
+ * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ * <li>{@link Auction} represents an item under auction.
+ * <li>{@link Bid} represents a bid for an item under auction.
+ * </ul>
+ * The queries exercise many aspects of the Beam model.
+ *
+ * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ *
+ * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
+ */
+public class Main<OptionT extends NexmarkOptions> {
+
+  /**
+   * Entry point.
+   */
+  void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) {
+    Instant start = Instant.now();
+    Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
+    Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
+    Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
+
+    boolean successful = true;
+    try {
+      // Run all the configurations.
+      for (NexmarkConfiguration configuration : configurations) {
+        NexmarkPerf perf = nexmarkLauncher.run(configuration);
+        if (perf != null) {
+          if (perf.errors == null || perf.errors.size() > 0) {
+            successful = false;
+          }
+          appendPerf(options.getPerfFilename(), configuration, perf);
+          actual.put(configuration, perf);
+          // Summarize what we've run so far.
+          saveSummary(null, configurations, actual, baseline, start);
+        }
+      }
+    } finally {
+      if (options.getMonitorJobs()) {
+        // Report overall performance.
+        saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
+        saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
+      }
+    }
+
+    if (!successful) {
+      throw new RuntimeException("Execution was not successful");
+    }
+  }
+
+  /**
+   * Append the pair of {@code configuration} and {@code perf} to perf file.
+   */
+  private void appendPerf(
+      @Nullable String perfFilename, NexmarkConfiguration configuration,
+      NexmarkPerf perf) {
+    if (perfFilename == null) {
+      return;
+    }
+    List<String> lines = new ArrayList<>();
+    lines.add("");
+    lines.add(String.format("# %s", Instant.now()));
+    lines.add(String.format("# %s", configuration.toShortString()));
+    lines.add(configuration.toString());
+    lines.add(perf.toString());
+    try {
+      Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
+          StandardOpenOption.APPEND);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to write perf file: ", e);
+    }
+    NexmarkUtils.console("appended results to perf file %s.", perfFilename);
+  }
+
+  /**
+   * Load the baseline perf.
+   */
+  @Nullable
+  private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
+      @Nullable String baselineFilename) {
+    if (baselineFilename == null) {
+      return null;
+    }
+    Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
+    List<String> lines;
+    try {
+      lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to read baseline perf file: ", e);
+    }
+    for (int i = 0; i < lines.size(); i++) {
+      if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
+        continue;
+      }
+      NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
+      NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
+      baseline.put(configuration, perf);
+    }
+    NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
+        baselineFilename);
+    return baseline;
+  }
+
+  private static final String LINE =
+      "==========================================================================================";
+
+  /**
+   * Print summary  of {@code actual} vs (if non-null) {@code baseline}.
+   */
+  private static void saveSummary(
+      @Nullable String summaryFilename,
+      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+    List<String> lines = new ArrayList<>();
+
+    lines.add("");
+    lines.add(LINE);
+
+    lines.add(
+        String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+    lines.add("");
+
+    lines.add("Default configuration:");
+    lines.add(NexmarkConfiguration.DEFAULT.toString());
+    lines.add("");
+
+    lines.add("Configurations:");
+    lines.add("  Conf  Description");
+    int conf = 0;
+    for (NexmarkConfiguration configuration : configurations) {
+      lines.add(String.format("  %04d  %s", conf++, configuration.toShortString()));
+      NexmarkPerf actualPerf = actual.get(configuration);
+      if (actualPerf != null && actualPerf.jobId != null) {
+        lines.add(String.format("  %4s  [Ran as job %s]", "", actualPerf.jobId));
+      }
+    }
+
+    lines.add("");
+    lines.add("Performance:");
+    lines.add(String.format("  %4s  %12s  %12s  %12s  %12s  %12s  %12s", "Conf", "Runtime(sec)",
+        "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
+    conf = 0;
+    for (NexmarkConfiguration configuration : configurations) {
+      String line = String.format("  %04d  ", conf++);
+      NexmarkPerf actualPerf = actual.get(configuration);
+      if (actualPerf == null) {
+        line += "*** not run ***";
+      } else {
+        NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+        double runtimeSec = actualPerf.runtimeSec;
+        line += String.format("%12.1f  ", runtimeSec);
+        if (baselinePerf == null) {
+          line += String.format("%12s  ", "");
+        } else {
+          double baselineRuntimeSec = baselinePerf.runtimeSec;
+          double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
+          line += String.format("%+11.2f%%  ", diff);
+        }
+
+        double eventsPerSec = actualPerf.eventsPerSec;
+        line += String.format("%12.1f  ", eventsPerSec);
+        if (baselinePerf == null) {
+          line += String.format("%12s  ", "");
+        } else {
+          double baselineEventsPerSec = baselinePerf.eventsPerSec;
+          double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
+          line += String.format("%+11.2f%%  ", diff);
+        }
+
+        long numResults = actualPerf.numResults;
+        line += String.format("%12d  ", numResults);
+        if (baselinePerf == null) {
+          line += String.format("%12s", "");
+        } else {
+          long baselineNumResults = baselinePerf.numResults;
+          long diff = numResults - baselineNumResults;
+          line += String.format("%+12d", diff);
+        }
+      }
+      lines.add(line);
+
+      if (actualPerf != null) {
+        List<String> errors = actualPerf.errors;
+        if (errors == null) {
+          errors = new ArrayList<>();
+          errors.add("NexmarkGoogleRunner returned null errors list");
+        }
+        for (String error : errors) {
+          lines.add(String.format("  %4s  *** %s ***", "", error));
+        }
+      }
+    }
+
+    lines.add(LINE);
+    lines.add("");
+
+    for (String line : lines) {
+      System.out.println(line);
+    }
+
+    if (summaryFilename != null) {
+      try {
+        Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
+            StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to save summary file: ", e);
+      }
+      NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
+    }
+  }
+
+  /**
+   * Write all perf data and any baselines to a javascript file which can be used by
+   * graphing page etc.
+   */
+  private static void saveJavascript(
+      @Nullable String javascriptFilename,
+      Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+      @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+    if (javascriptFilename == null) {
+      return;
+    }
+
+    List<String> lines = new ArrayList<>();
+    lines.add(String.format(
+        "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+    lines.add("var all = [");
+
+    for (NexmarkConfiguration configuration : configurations) {
+      lines.add("  {");
+      lines.add(String.format("    config: %s", configuration));
+      NexmarkPerf actualPerf = actual.get(configuration);
+      if (actualPerf != null) {
+        lines.add(String.format("    ,perf: %s", actualPerf));
+      }
+      NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+      if (baselinePerf != null) {
+        lines.add(String.format("    ,baseline: %s", baselinePerf));
+      }
+      lines.add("  },");
+    }
+
+    lines.add("];");
+
+    try {
+      Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
+          StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to save javascript file: ", e);
+    }
+    NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
+  }
+
+  public static void main(String[] args) {
+    NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
+      .withValidation()
+      .as(NexmarkOptions.class);
+    NexmarkLauncher<NexmarkOptions> nexmarkLauncher = new NexmarkLauncher<>(options);
+    new Main<>().runAll(options, nexmarkLauncher);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
new file mode 100644
index 0000000..f45c387
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A monitor of elements with support for later retrieving their metrics.
+ *
+ * @param <T> Type of element we are monitoring.
+ */
+public class Monitor<T extends KnownSize> implements Serializable {
+  private class MonitorDoFn extends DoFn<T, T> {
+    final Counter elementCounter =
+      Metrics.counter(name , prefix + ".elements");
+    final Counter bytesCounter =
+      Metrics.counter(name , prefix + ".bytes");
+    final Distribution startTime =
+      Metrics.distribution(name , prefix + ".startTime");
+    final Distribution endTime =
+      Metrics.distribution(name , prefix + ".endTime");
+    final Distribution startTimestamp =
+      Metrics.distribution(name , prefix + ".startTimestamp");
+    final Distribution endTimestamp =
+      Metrics.distribution(name , prefix + ".endTimestamp");
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      elementCounter.inc();
+      bytesCounter.inc(c.element().sizeInBytes());
+      long now = System.currentTimeMillis();
+      startTime.update(now);
+      endTime.update(now);
+      startTimestamp.update(c.timestamp().getMillis());
+      endTimestamp.update(c.timestamp().getMillis());
+      c.output(c.element());
+    }
+  }
+
+  public final String name;
+  public final String prefix;
+  private final MonitorDoFn doFn;
+  private final PTransform<PCollection<? extends T>, PCollection<T>> transform;
+
+  public Monitor(String name, String prefix) {
+    this.name = name;
+    this.prefix = prefix;
+    doFn = new MonitorDoFn();
+    transform = ParDo.of(doFn);
+  }
+
+  public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() {
+    return transform;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
new file mode 100644
index 0000000..904fcd5
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
@@ -0,0 +1,721 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Configuration controlling how a query is run. May be supplied by command line or
+ * programmatically. We only capture properties which may influence the resulting
+ * pipeline performance, as captured by {@link NexmarkPerf}.
+ */
+public class NexmarkConfiguration implements Serializable {
+  public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration();
+
+  /** If {@literal true}, include additional debugging and monitoring stats. */
+  @JsonProperty
+  public boolean debug = true;
+
+  /** Which query to run, in [0,9]. */
+  @JsonProperty
+  public int query = 0;
+
+  /** Where events come from. */
+  @JsonProperty
+  public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT;
+
+  /** Where results go to. */
+  @JsonProperty
+  public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL;
+
+  /**
+   * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated
+   * into the overall query pipeline.
+   */
+  @JsonProperty
+  public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED;
+
+  /**
+   * Number of events to generate. If zero, generate as many as possible without overflowing
+   * internal counters etc.
+   */
+  @JsonProperty
+  public long numEvents = 100000;
+
+  /**
+   * Number of event generators to use. Each generates events in its own timeline.
+   */
+  @JsonProperty
+  public int numEventGenerators = 100;
+
+  /**
+   * Shape of event rate curve.
+   */
+  @JsonProperty
+  public NexmarkUtils.RateShape rateShape = NexmarkUtils.RateShape.SINE;
+
+  /**
+   * Initial overall event rate (in {@link #rateUnit}).
+   */
+  @JsonProperty
+  public int firstEventRate = 10000;
+
+  /**
+   * Next overall event rate (in {@link #rateUnit}).
+   */
+  @JsonProperty
+  public int nextEventRate = 10000;
+
+  /**
+   * Unit for rates.
+   */
+  @JsonProperty
+  public NexmarkUtils.RateUnit rateUnit = NexmarkUtils.RateUnit.PER_SECOND;
+
+  /**
+   * Overall period of rate shape, in seconds.
+   */
+  @JsonProperty
+  public int ratePeriodSec = 600;
+
+  /**
+   * Time in seconds to preload the subscription with data, at the initial input rate of the
+   * pipeline.
+   */
+  @JsonProperty
+  public int preloadSeconds = 0;
+
+  /**
+   * Timeout for stream pipelines to stop in seconds.
+   */
+  @JsonProperty
+  public int streamTimeout = 240;
+
+  /**
+   * If true, and in streaming mode, generate events only when they are due according to their
+   * timestamp.
+   */
+  @JsonProperty
+  public boolean isRateLimited = false;
+
+  /**
+   * If true, use wallclock time as event time. Otherwise, use a deterministic
+   * time in the past so that multiple runs will see exactly the same event streams
+   * and should thus have exactly the same results.
+   */
+  @JsonProperty
+  public boolean useWallclockEventTime = false;
+
+  /** Average idealized size of a 'new person' event, in bytes. */
+  @JsonProperty
+  public int avgPersonByteSize = 200;
+
+  /** Average idealized size of a 'new auction' event, in bytes. */
+  @JsonProperty
+  public int avgAuctionByteSize = 500;
+
+  /** Average idealized size of a 'bid' event, in bytes. */
+  @JsonProperty
+  public int avgBidByteSize = 100;
+
+  /** Ratio of bids to 'hot' auctions compared to all other auctions. */
+  @JsonProperty
+  public int hotAuctionRatio = 2;
+
+  /** Ratio of auctions for 'hot' sellers compared to all other people. */
+  @JsonProperty
+  public int hotSellersRatio = 4;
+
+  /** Ratio of bids for 'hot' bidders compared to all other people. */
+  @JsonProperty
+  public int hotBiddersRatio = 4;
+
+  /** Window size, in seconds, for queries 3, 5, 7 and 8. */
+  @JsonProperty
+  public long windowSizeSec = 10;
+
+  /** Sliding window period, in seconds, for query 5. */
+  @JsonProperty
+  public long windowPeriodSec = 5;
+
+  /** Number of seconds to hold back events according to their reported timestamp. */
+  @JsonProperty
+  public long watermarkHoldbackSec = 0;
+
+  /** Average number of auction which should be inflight at any time, per generator. */
+  @JsonProperty
+  public int numInFlightAuctions = 100;
+
+  /** Maximum number of people to consider as active for placing auctions or bids. */
+  @JsonProperty
+  public int numActivePeople = 1000;
+
+  /** Coder strategy to follow. */
+  @JsonProperty
+  public NexmarkUtils.CoderStrategy coderStrategy = NexmarkUtils.CoderStrategy.HAND;
+
+  /**
+   * Delay, in milliseconds, for each event. This will peg one core for this number
+   * of milliseconds to simulate CPU-bound computation.
+   */
+  @JsonProperty
+  public long cpuDelayMs = 0;
+
+  /**
+   * Extra data, in bytes, to save to persistent state for each event. This will force
+   * i/o all the way to durable storage to simulate an I/O-bound computation.
+   */
+  @JsonProperty
+  public long diskBusyBytes = 0;
+
+  /**
+   * Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction.
+   */
+  @JsonProperty
+  public int auctionSkip = 123;
+
+  /**
+   * Fanout for queries 4 (groups by category id), 5 and 7 (find a global maximum).
+   */
+  @JsonProperty
+  public int fanout = 5;
+
+  /**
+   * Maximum waiting time to clean personState in query3
+   * (ie maximum waiting of the auctions related to person in state in seconds in event time).
+   */
+  @JsonProperty
+  public int maxAuctionsWaitingTime = 600;
+
+  /**
+   * Length of occasional delay to impose on events (in seconds).
+   */
+  @JsonProperty
+  public long occasionalDelaySec = 3;
+
+  /**
+   * Probability that an event will be delayed by delayS.
+   */
+  @JsonProperty
+  public double probDelayedEvent = 0.1;
+
+  /**
+   * Maximum size of each log file (in events). For Query10 only.
+   */
+  @JsonProperty
+  public int maxLogEvents = 100_000;
+
+  /**
+   * If true, use pub/sub publish time instead of event time.
+   */
+  @JsonProperty
+  public boolean usePubsubPublishTime = false;
+
+  /**
+   * Number of events in out-of-order groups. 1 implies no out-of-order events. 1000 implies
+   * every 1000 events per generator are emitted in pseudo-random order.
+   */
+  @JsonProperty
+  public long outOfOrderGroupSize = 1;
+
+  /**
+   * Replace any properties of this configuration which have been supplied by the command line.
+   */
+  public void overrideFromOptions(NexmarkOptions options) {
+    if (options.getDebug() != null) {
+      debug = options.getDebug();
+    }
+    if (options.getQuery() != null) {
+      query = options.getQuery();
+    }
+    if (options.getSourceType() != null) {
+      sourceType = options.getSourceType();
+    }
+    if (options.getSinkType() != null) {
+      sinkType = options.getSinkType();
+    }
+    if (options.getPubSubMode() != null) {
+      pubSubMode = options.getPubSubMode();
+    }
+    if (options.getNumEvents() != null) {
+      numEvents = options.getNumEvents();
+    }
+    if (options.getNumEventGenerators() != null) {
+      numEventGenerators = options.getNumEventGenerators();
+    }
+    if (options.getRateShape() != null) {
+      rateShape = options.getRateShape();
+    }
+    if (options.getFirstEventRate() != null) {
+      firstEventRate = options.getFirstEventRate();
+    }
+    if (options.getNextEventRate() != null) {
+      nextEventRate = options.getNextEventRate();
+    }
+    if (options.getRateUnit() != null) {
+      rateUnit = options.getRateUnit();
+    }
+    if (options.getRatePeriodSec() != null) {
+      ratePeriodSec = options.getRatePeriodSec();
+    }
+    if (options.getPreloadSeconds() != null) {
+      preloadSeconds = options.getPreloadSeconds();
+    }
+    if (options.getStreamTimeout() != null) {
+      streamTimeout = options.getStreamTimeout();
+    }
+    if (options.getIsRateLimited() != null) {
+      isRateLimited = options.getIsRateLimited();
+    }
+    if (options.getUseWallclockEventTime() != null) {
+      useWallclockEventTime = options.getUseWallclockEventTime();
+    }
+    if (options.getAvgPersonByteSize() != null) {
+      avgPersonByteSize = options.getAvgPersonByteSize();
+    }
+    if (options.getAvgAuctionByteSize() != null) {
+      avgAuctionByteSize = options.getAvgAuctionByteSize();
+    }
+    if (options.getAvgBidByteSize() != null) {
+      avgBidByteSize = options.getAvgBidByteSize();
+    }
+    if (options.getHotAuctionRatio() != null) {
+      hotAuctionRatio = options.getHotAuctionRatio();
+    }
+    if (options.getHotSellersRatio() != null) {
+      hotSellersRatio = options.getHotSellersRatio();
+    }
+    if (options.getHotBiddersRatio() != null) {
+      hotBiddersRatio = options.getHotBiddersRatio();
+    }
+    if (options.getWindowSizeSec() != null) {
+      windowSizeSec = options.getWindowSizeSec();
+    }
+    if (options.getWindowPeriodSec() != null) {
+      windowPeriodSec = options.getWindowPeriodSec();
+    }
+    if (options.getWatermarkHoldbackSec() != null) {
+      watermarkHoldbackSec = options.getWatermarkHoldbackSec();
+    }
+    if (options.getNumInFlightAuctions() != null) {
+      numInFlightAuctions = options.getNumInFlightAuctions();
+    }
+    if (options.getNumActivePeople() != null) {
+      numActivePeople = options.getNumActivePeople();
+    }
+    if (options.getCoderStrategy() != null) {
+      coderStrategy = options.getCoderStrategy();
+    }
+    if (options.getCpuDelayMs() != null) {
+      cpuDelayMs = options.getCpuDelayMs();
+    }
+    if (options.getDiskBusyBytes() != null) {
+      diskBusyBytes = options.getDiskBusyBytes();
+    }
+    if (options.getAuctionSkip() != null) {
+      auctionSkip = options.getAuctionSkip();
+    }
+    if (options.getFanout() != null) {
+      fanout = options.getFanout();
+    }
+    if (options.getMaxAuctionsWaitingTime() != null) {
+      fanout = options.getMaxAuctionsWaitingTime();
+    }
+    if (options.getOccasionalDelaySec() != null) {
+      occasionalDelaySec = options.getOccasionalDelaySec();
+    }
+    if (options.getProbDelayedEvent() != null) {
+      probDelayedEvent = options.getProbDelayedEvent();
+    }
+    if (options.getMaxLogEvents() != null) {
+      maxLogEvents = options.getMaxLogEvents();
+    }
+    if (options.getUsePubsubPublishTime() != null) {
+      usePubsubPublishTime = options.getUsePubsubPublishTime();
+    }
+    if (options.getOutOfOrderGroupSize() != null) {
+      outOfOrderGroupSize = options.getOutOfOrderGroupSize();
+    }
+  }
+
+  /**
+   * Return copy of configuration with given label.
+   */
+  public NexmarkConfiguration copy() {
+    NexmarkConfiguration result;
+    result = new NexmarkConfiguration();
+    result.debug = debug;
+    result.query = query;
+    result.sourceType = sourceType;
+    result.sinkType = sinkType;
+    result.pubSubMode = pubSubMode;
+    result.numEvents = numEvents;
+    result.numEventGenerators = numEventGenerators;
+    result.rateShape = rateShape;
+    result.firstEventRate = firstEventRate;
+    result.nextEventRate = nextEventRate;
+    result.rateUnit = rateUnit;
+    result.ratePeriodSec = ratePeriodSec;
+    result.preloadSeconds = preloadSeconds;
+    result.streamTimeout = streamTimeout;
+    result.isRateLimited = isRateLimited;
+    result.useWallclockEventTime = useWallclockEventTime;
+    result.avgPersonByteSize = avgPersonByteSize;
+    result.avgAuctionByteSize = avgAuctionByteSize;
+    result.avgBidByteSize = avgBidByteSize;
+    result.hotAuctionRatio = hotAuctionRatio;
+    result.hotSellersRatio = hotSellersRatio;
+    result.hotBiddersRatio = hotBiddersRatio;
+    result.windowSizeSec = windowSizeSec;
+    result.windowPeriodSec = windowPeriodSec;
+    result.watermarkHoldbackSec = watermarkHoldbackSec;
+    result.numInFlightAuctions = numInFlightAuctions;
+    result.numActivePeople = numActivePeople;
+    result.coderStrategy = coderStrategy;
+    result.cpuDelayMs = cpuDelayMs;
+    result.diskBusyBytes = diskBusyBytes;
+    result.auctionSkip = auctionSkip;
+    result.fanout = fanout;
+    result.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
+    result.occasionalDelaySec = occasionalDelaySec;
+    result.probDelayedEvent = probDelayedEvent;
+    result.maxLogEvents = maxLogEvents;
+    result.usePubsubPublishTime = usePubsubPublishTime;
+    result.outOfOrderGroupSize = outOfOrderGroupSize;
+    return result;
+  }
+
+  /**
+   * Return short description of configuration (suitable for use in logging). We only render
+   * the core fields plus those which do not have default values.
+   */
+  public String toShortString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(String.format("query:%d", query));
+    if (debug != DEFAULT.debug) {
+      sb.append(String.format("; debug:%s", debug));
+    }
+    if (sourceType != DEFAULT.sourceType) {
+      sb.append(String.format("; sourceType:%s", sourceType));
+    }
+    if (sinkType != DEFAULT.sinkType) {
+      sb.append(String.format("; sinkType:%s", sinkType));
+    }
+    if (pubSubMode != DEFAULT.pubSubMode) {
+      sb.append(String.format("; pubSubMode:%s", pubSubMode));
+    }
+    if (numEvents != DEFAULT.numEvents) {
+      sb.append(String.format("; numEvents:%d", numEvents));
+    }
+    if (numEventGenerators != DEFAULT.numEventGenerators) {
+      sb.append(String.format("; numEventGenerators:%d", numEventGenerators));
+    }
+    if (rateShape != DEFAULT.rateShape) {
+      sb.append(String.format("; rateShape:%s", rateShape));
+    }
+    if (firstEventRate != DEFAULT.firstEventRate || nextEventRate != DEFAULT.nextEventRate) {
+      sb.append(String.format("; firstEventRate:%d", firstEventRate));
+      sb.append(String.format("; nextEventRate:%d", nextEventRate));
+    }
+    if (rateUnit != DEFAULT.rateUnit) {
+      sb.append(String.format("; rateUnit:%s", rateUnit));
+    }
+    if (ratePeriodSec != DEFAULT.ratePeriodSec) {
+      sb.append(String.format("; ratePeriodSec:%d", ratePeriodSec));
+    }
+    if (preloadSeconds != DEFAULT.preloadSeconds) {
+      sb.append(String.format("; preloadSeconds:%d", preloadSeconds));
+    }
+    if (streamTimeout != DEFAULT.streamTimeout) {
+      sb.append(String.format("; streamTimeout:%d", streamTimeout));
+    }
+    if (isRateLimited != DEFAULT.isRateLimited) {
+      sb.append(String.format("; isRateLimited:%s", isRateLimited));
+    }
+    if (useWallclockEventTime != DEFAULT.useWallclockEventTime) {
+      sb.append(String.format("; useWallclockEventTime:%s", useWallclockEventTime));
+    }
+    if (avgPersonByteSize != DEFAULT.avgPersonByteSize) {
+      sb.append(String.format("; avgPersonByteSize:%d", avgPersonByteSize));
+    }
+    if (avgAuctionByteSize != DEFAULT.avgAuctionByteSize) {
+      sb.append(String.format("; avgAuctionByteSize:%d", avgAuctionByteSize));
+    }
+    if (avgBidByteSize != DEFAULT.avgBidByteSize) {
+      sb.append(String.format("; avgBidByteSize:%d", avgBidByteSize));
+    }
+    if (hotAuctionRatio != DEFAULT.hotAuctionRatio) {
+      sb.append(String.format("; hotAuctionRatio:%d", hotAuctionRatio));
+    }
+    if (hotSellersRatio != DEFAULT.hotSellersRatio) {
+      sb.append(String.format("; hotSellersRatio:%d", hotSellersRatio));
+    }
+    if (hotBiddersRatio != DEFAULT.hotBiddersRatio) {
+      sb.append(String.format("; hotBiddersRatio:%d", hotBiddersRatio));
+    }
+    if (windowSizeSec != DEFAULT.windowSizeSec) {
+      sb.append(String.format("; windowSizeSec:%d", windowSizeSec));
+    }
+    if (windowPeriodSec != DEFAULT.windowPeriodSec) {
+      sb.append(String.format("; windowPeriodSec:%d", windowPeriodSec));
+    }
+    if (watermarkHoldbackSec != DEFAULT.watermarkHoldbackSec) {
+      sb.append(String.format("; watermarkHoldbackSec:%d", watermarkHoldbackSec));
+    }
+    if (numInFlightAuctions != DEFAULT.numInFlightAuctions) {
+      sb.append(String.format("; numInFlightAuctions:%d", numInFlightAuctions));
+    }
+    if (numActivePeople != DEFAULT.numActivePeople) {
+      sb.append(String.format("; numActivePeople:%d", numActivePeople));
+    }
+    if (coderStrategy != DEFAULT.coderStrategy) {
+      sb.append(String.format("; coderStrategy:%s", coderStrategy));
+    }
+    if (cpuDelayMs != DEFAULT.cpuDelayMs) {
+      sb.append(String.format("; cpuSlowdownMs:%d", cpuDelayMs));
+    }
+    if (diskBusyBytes != DEFAULT.diskBusyBytes) {
+      sb.append(String.format("; diskBuysBytes:%d", diskBusyBytes));
+    }
+    if (auctionSkip != DEFAULT.auctionSkip) {
+      sb.append(String.format("; auctionSkip:%d", auctionSkip));
+    }
+    if (fanout != DEFAULT.fanout) {
+      sb.append(String.format("; fanout:%d", fanout));
+    }
+    if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) {
+      sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout));
+    }
+    if (occasionalDelaySec != DEFAULT.occasionalDelaySec) {
+      sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec));
+    }
+    if (probDelayedEvent != DEFAULT.probDelayedEvent) {
+      sb.append(String.format("; probDelayedEvent:%f", probDelayedEvent));
+    }
+    if (maxLogEvents != DEFAULT.maxLogEvents) {
+      sb.append(String.format("; maxLogEvents:%d", maxLogEvents));
+    }
+    if (usePubsubPublishTime != DEFAULT.usePubsubPublishTime) {
+      sb.append(String.format("; usePubsubPublishTime:%s", usePubsubPublishTime));
+    }
+    if (outOfOrderGroupSize != DEFAULT.outOfOrderGroupSize) {
+      sb.append(String.format("; outOfOrderGroupSize:%d", outOfOrderGroupSize));
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Return full description as a string.
+   */
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parse an object from {@code string}.
+   */
+  public static NexmarkConfiguration fromString(String string) {
+    try {
+      return NexmarkUtils.MAPPER.readValue(string, NexmarkConfiguration.class);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to parse nexmark configuration: ", e);
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        debug,
+        query,
+        sourceType,
+        sinkType,
+        pubSubMode,
+        numEvents,
+        numEventGenerators,
+        rateShape,
+        firstEventRate,
+        nextEventRate,
+        rateUnit,
+        ratePeriodSec,
+        preloadSeconds,
+        streamTimeout,
+        isRateLimited,
+        useWallclockEventTime,
+        avgPersonByteSize,
+        avgAuctionByteSize,
+        avgBidByteSize,
+        hotAuctionRatio,
+        hotSellersRatio,
+        hotBiddersRatio,
+        windowSizeSec,
+        windowPeriodSec,
+        watermarkHoldbackSec,
+        numInFlightAuctions,
+        numActivePeople,
+        coderStrategy,
+        cpuDelayMs,
+        diskBusyBytes,
+        auctionSkip,
+        fanout,
+        maxAuctionsWaitingTime,
+        occasionalDelaySec,
+        probDelayedEvent,
+        maxLogEvents,
+        usePubsubPublishTime,
+        outOfOrderGroupSize);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    NexmarkConfiguration other = (NexmarkConfiguration) obj;
+    if (debug != other.debug) {
+      return false;
+    }
+    if (auctionSkip != other.auctionSkip) {
+      return false;
+    }
+    if (avgAuctionByteSize != other.avgAuctionByteSize) {
+      return false;
+    }
+    if (avgBidByteSize != other.avgBidByteSize) {
+      return false;
+    }
+    if (avgPersonByteSize != other.avgPersonByteSize) {
+      return false;
+    }
+    if (coderStrategy != other.coderStrategy) {
+      return false;
+    }
+    if (cpuDelayMs != other.cpuDelayMs) {
+      return false;
+    }
+    if (diskBusyBytes != other.diskBusyBytes) {
+      return false;
+    }
+    if (fanout != other.fanout) {
+      return false;
+    }
+    if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) {
+      return false;
+    }
+    if (firstEventRate != other.firstEventRate) {
+      return false;
+    }
+    if (hotAuctionRatio != other.hotAuctionRatio) {
+      return false;
+    }
+    if (hotBiddersRatio != other.hotBiddersRatio) {
+      return false;
+    }
+    if (hotSellersRatio != other.hotSellersRatio) {
+      return false;
+    }
+    if (isRateLimited != other.isRateLimited) {
+      return false;
+    }
+    if (maxLogEvents != other.maxLogEvents) {
+      return false;
+    }
+    if (nextEventRate != other.nextEventRate) {
+      return false;
+    }
+    if (rateUnit != other.rateUnit) {
+      return false;
+    }
+    if (numEventGenerators != other.numEventGenerators) {
+      return false;
+    }
+    if (numEvents != other.numEvents) {
+      return false;
+    }
+    if (numInFlightAuctions != other.numInFlightAuctions) {
+      return false;
+    }
+    if (numActivePeople != other.numActivePeople) {
+      return false;
+    }
+    if (occasionalDelaySec != other.occasionalDelaySec) {
+      return false;
+    }
+    if (preloadSeconds != other.preloadSeconds) {
+      return false;
+    }
+    if (streamTimeout != other.streamTimeout) {
+      return false;
+    }
+    if (Double.doubleToLongBits(probDelayedEvent)
+        != Double.doubleToLongBits(other.probDelayedEvent)) {
+      return false;
+    }
+    if (pubSubMode != other.pubSubMode) {
+      return false;
+    }
+    if (ratePeriodSec != other.ratePeriodSec) {
+      return false;
+    }
+    if (rateShape != other.rateShape) {
+      return false;
+    }
+    if (query != other.query) {
+      return false;
+    }
+    if (sinkType != other.sinkType) {
+      return false;
+    }
+    if (sourceType != other.sourceType) {
+      return false;
+    }
+    if (useWallclockEventTime != other.useWallclockEventTime) {
+      return false;
+    }
+    if (watermarkHoldbackSec != other.watermarkHoldbackSec) {
+      return false;
+    }
+    if (windowPeriodSec != other.windowPeriodSec) {
+      return false;
+    }
+    if (windowSizeSec != other.windowSizeSec) {
+      return false;
+    }
+    if (usePubsubPublishTime != other.usePubsubPublishTime) {
+      return false;
+    }
+    if (outOfOrderGroupSize != other.outOfOrderGroupSize) {
+      return false;
+    }
+    return true;
+  }
+}