You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:54 UTC
[46/55] [abbrv] beam git commit: Move module
beam-integration-java-nexmark to beam-sdks-java-nexmark
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/pom.xml b/integration/java/pom.xml
deleted file mode 100644
index b0c3853..0000000
--- a/integration/java/pom.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-integration-parent</artifactId>
- <version>2.1.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>beam-integration-java-parent</artifactId>
- <packaging>pom</packaging>
- <name>Apache Beam :: Integration Tests :: Java</name>
-
- <modules>
- <module>nexmark</module>
- </modules>
-
-</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
deleted file mode 100644
index 4254819..0000000
--- a/integration/pom.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-parent</artifactId>
- <version>2.1.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>beam-integration-parent</artifactId>
- <packaging>pom</packaging>
- <name>Apache Beam :: Integration Tests</name>
-
- <profiles>
- <profile>
- <id>release</id>
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
- <modules>
- <module>java</module>
- </modules>
-
-</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bddbf1f..5fd1297 100644
--- a/pom.xml
+++ b/pom.xml
@@ -187,7 +187,6 @@
<module>sdks</module>
<module>runners</module>
<module>examples</module>
- <module>integration</module>
<!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. -->
<module>sdks/java/javadoc</module>
</modules>
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/README.md b/sdks/java/nexmark/README.md
new file mode 100644
index 0000000..a9acd63
--- /dev/null
+++ b/sdks/java/nexmark/README.md
@@ -0,0 +1,340 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# NEXMark integration suite
+
+This is a suite of pipelines inspired by the 'continuous data stream'
+queries in [http://datalab.cs.pdx.edu/niagaraST/NEXMark/]
+(http://datalab.cs.pdx.edu/niagaraST/NEXMark/).
+
+These are multiple queries over a three entities model representing on online auction system:
+
+ - **Person** represents a person submitting an item for auction and/or making a bid
+ on an auction.
+ - **Auction** represents an item under auction.
+ - **Bid** represents a bid for an item under auction.
+
+The queries exercise many aspects of Beam model:
+
+* **Query1**: What are the bid values in Euro's?
+ Illustrates a simple map.
+* **Query2**: What are the auctions with particular auction numbers?
+ Illustrates a simple filter.
+* **Query3**: Who is selling in particular US states?
+ Illustrates an incremental join (using per-key state and timer) and filter.
+* **Query4**: What is the average selling price for each auction
+ category?
+ Illustrates complex join (using custom window functions) and
+ aggregation.
+* **Query5**: Which auctions have seen the most bids in the last period?
+ Illustrates sliding windows and combiners.
+* **Query6**: What is the average selling price per seller for their
+ last 10 closed auctions.
+ Shares the same 'winning bids' core as for **Query4**, and
+ illustrates a specialized combiner.
+* **Query7**: What are the highest bids per period?
+ Deliberately implemented using a side input to illustrate fanout.
+* **Query8**: Who has entered the system and created an auction in
+ the last period?
+ Illustrates a simple join.
+
+We have augmented the original queries with five more:
+
+* **Query0**: Pass-through.
+ Allows us to measure the monitoring overhead.
+* **Query9**: Winning-bids.
+ A common sub-query shared by **Query4** and **Query6**.
+* **Query10**: Log all events to GCS files.
+ Illustrates windows with large side effects on firing.
+* **Query11**: How many bids did a user make in each session they
+ were active?
+ Illustrates session windows.
+* **Query12**: How many bids does a user make within a fixed
+ processing time limit?
+ Illustrates working in processing time in the Global window, as
+ compared with event time in non-Global windows for all the other
+ queries.
+
+We can specify the Beam runner to use with maven profiles, available profiles are:
+
+* direct-runner
+* spark-runner
+* flink-runner
+* apex-runner
+
+The runner must also be specified like in any other Beam pipeline using
+
+ --runner
+
+
+Test data is deterministically synthesized on demand. The test
+data may be synthesized in the same pipeline as the query itself,
+or may be published to Pubsub.
+
+The query results may be:
+
+* Published to Pubsub.
+* Written to text files as plain text.
+* Written to text files using an Avro encoding.
+* Send to BigQuery.
+* Discarded.
+
+# Configuration
+
+## Common configuration parameters
+
+Decide if batch or streaming:
+
+ --streaming=true
+
+Number of events generators
+
+ --numEventGenerators=4
+
+Run query N
+
+ --query=N
+
+## Available Suites
+The suite to run can be chosen using this configuration parameter:
+
+ --suite=SUITE
+
+Available suites are:
+* DEFAULT: Test default configuration with query 0.
+* SMOKE: Run the 12 default configurations.
+* STRESS: Like smoke but for 1m events.
+* FULL_THROTTLE: Like SMOKE but 100m events.
+
+
+
+## Apex specific configuration
+
+ --manageResources=false --monitorJobs=false
+
+## Dataflow specific configuration
+
+ --manageResources=false --monitorJobs=true \
+ --enforceEncodability=false --enforceImmutability=false
+ --project=<your project> \
+ --zone=<your zone> \
+ --workerMachineType=n1-highmem-8 \
+ --stagingLocation=<a gs path for staging> \
+ --runner=DataflowRunner \
+ --tempLocation=gs://talend-imejia/nexmark/temp/ \
+ --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \
+ --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar
+
+## Direct specific configuration
+
+ --manageResources=false --monitorJobs=true \
+ --enforceEncodability=false --enforceImmutability=false
+
+## Flink specific configuration
+
+ --manageResources=false --monitorJobs=true \
+ --flinkMaster=local --parallelism=#numcores
+
+## Spark specific configuration
+
+ --manageResources=false --monitorJobs=true \
+ --sparkMaster=local \
+ -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
+
+# Current Status
+
+Open issues are tracked [here](https://github.com../../../../../issues):
+
+## Batch / Synthetic / Local
+
+| Query | Direct | Spark | Flink | Apex |
+| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
+| 0 | ok | ok | ok | ok |
+| 1 | ok | ok | ok | ok |
+| 2 | ok | ok | ok | ok |
+| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | ok | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
+| 4 | ok | ok | ok | ok |
+| 5 | ok | ok | ok | ok |
+| 6 | ok | ok | ok | ok |
+| 7 | ok | ok | ok | ok |
+| 8 | ok | ok | ok | ok |
+| 9 | ok | ok | ok | ok |
+| 10 | ok | ok | ok | ok |
+| 11 | ok | ok | ok | ok |
+| 12 | ok | ok | ok | ok |
+
+## Streaming / Synthetic / Local
+
+| Query | Direct | Spark | Flink | Apex |
+| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
+| 0 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 1 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 2 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
+| 4 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 5 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 6 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 7 | ok | [BEAM-2112](https://issues.apache.org/jira/browse/BEAM-2112) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 8 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 9 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 10 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 11 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 12 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+
+## Batch / Synthetic / Cluster
+
+TODO
+
+| Query | Dataflow | Spark | Flink | Apex |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+| 0 | | | | |
+
+## Streaming / Synthetic / Cluster
+
+TODO
+
+| Query | Dataflow | Spark | Flink | Apex |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+| 0 | | | | |
+
+# Running Nexmark
+
+## Running SMOKE suite on the DirectRunner (local)
+
+Batch Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
+
+Streaming Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
+
+
+## Running SMOKE suite on the SparkRunner (local)
+
+Batch Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true"
+
+Streaming Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true"
+
+
+## Running SMOKE suite on the FlinkRunner (local)
+
+Batch Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true --flinkMaster=local"
+
+Streaming Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true --flinkMaster=local"
+
+
+## Running SMOKE suite on the ApexRunner (local)
+
+Batch Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false"
+
+Streaming Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false"
+
+
+## Running SMOKE suite on Google Cloud Dataflow
+
+Building package
+
+ mvn clean package -Pdataflow-runner
+
+Submit to Google Dataflow service
+
+
+```
+java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
+ org.apache.beam.integration.nexmark.Main \
+ --runner=DataflowRunner
+ --project=<your project> \
+ --zone=<your zone> \
+ --workerMachineType=n1-highmem-8 \
+ --stagingLocation=<a gs path for staging> \
+ --streaming=true \
+ --sourceType=PUBSUB \
+ --pubSubMode=PUBLISH_ONLY \
+ --pubsubTopic=<an existing Pubsub topic> \
+ --resourceNameMode=VERBATIM \
+ --manageResources=false \
+ --monitorJobs=false \
+ --numEventGenerators=64 \
+ --numWorkers=16 \
+ --maxNumWorkers=16 \
+ --suite=SMOKE \
+ --firstEventRate=100000 \
+ --nextEventRate=100000 \
+ --ratePeriodSec=3600 \
+ --isRateLimited=true \
+ --avgPersonByteSize=500 \
+ --avgAuctionByteSize=500 \
+ --avgBidByteSize=500 \
+ --probDelayedEvent=0.000001 \
+ --occasionalDelaySec=3600 \
+ --numEvents=0 \
+ --useWallclockEventTime=true \
+ --usePubsubPublishTime=true \
+ --experiments=enable_custom_pubsub_sink
+```
+
+```
+java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
+ org.apache.beam.integration.nexmark.Main \
+ --runner=DataflowRunner
+ --project=<your project> \
+ --zone=<your zone> \
+ --workerMachineType=n1-highmem-8 \
+ --stagingLocation=<a gs path for staging> \
+ --streaming=true \
+ --sourceType=PUBSUB \
+ --pubSubMode=SUBSCRIBE_ONLY \
+ --pubsubSubscription=<an existing Pubsub subscription to above topic> \
+ --resourceNameMode=VERBATIM \
+ --manageResources=false \
+ --monitorJobs=false \
+ --numWorkers=64 \
+ --maxNumWorkers=64 \
+ --suite=SMOKE \
+ --usePubsubPublishTime=true \
+ --outputPath=<a gs path under which log files will be written> \
+ --windowSizeSec=600 \
+ --occasionalDelaySec=3600 \
+ --maxLogEvents=10000 \
+ --experiments=enable_custom_pubsub_source
+```
+
+## Running query 0 on a Spark cluster with yarn
+
+Building package
+
+ mvn clean package -Pspark-runner
+
+Submit to the cluster
+
+ spark-submit --master yarn-client --class org.apache.beam.integration.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true
+
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/pom.xml b/sdks/java/nexmark/pom.xml
new file mode 100644
index 0000000..c1b6025
--- /dev/null
+++ b/sdks/java/nexmark/pom.xml
@@ -0,0 +1,292 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-parent</artifactId>
+ <version>2.1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-nexmark</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: Nexmark</name>
+ <packaging>jar</packaging>
+
+ <profiles>
+
+ <!--
+ The direct runner is available by default.
+ You can also include it on the classpath explicitly with -P direct-runner
+ -->
+ <profile>
+ <id>direct-runner</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <!-- Include the Apache Apex runner with -P apex-runner -->
+ <profile>
+ <id>apex-runner</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-apex</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <!--
+ Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
+ google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
+ can be removed when the project no longer has a dependency on a different httpclient version.
+ -->
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.3.5</version>
+ <scope>runtime</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <!-- Include the Apache Flink runner with -P flink-runner -->
+ <profile>
+ <id>flink-runner</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-flink_2.10</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <!-- Include the Apache Spark runner -P spark-runner -->
+ <profile>
+ <id>spark-runner</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-spark</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>runtime</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <!-- Include the Google Cloud Dataflow runner -P dataflow-runner -->
+ <profile>
+ <id>dataflow-runner</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-bundled-${project.version}</finalName>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Avro plugin for automatic code generation -->
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>schemas</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/src/main/</sourceDirectory>
+ <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Coverage analysis for unit tests. -->
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- Java SDK -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <!-- IOs -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-bigquery</artifactId>
+ </dependency>
+
+ <!-- Extra libraries -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ </dependency>
+
+ <!-- Test -->
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
new file mode 100644
index 0000000..ab2284c
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of the 'NEXMark queries' for Beam.
+ * These are multiple queries over a three table schema representing an online auction system:
+ * <ul>
+ * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ * <li>{@link Auction} represents an item under auction.
+ * <li>{@link Bid} represents a bid for an item under auction.
+ * </ul>
+ * The queries exercise many aspects of the Beam model.
+ *
+ * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ *
+ * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
+ */
+public class Main<OptionT extends NexmarkOptions> {
+
+ /**
+ * Entry point.
+ */
+ void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) {
+ Instant start = Instant.now();
+ Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
+ Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
+ Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
+
+ boolean successful = true;
+ try {
+ // Run all the configurations.
+ for (NexmarkConfiguration configuration : configurations) {
+ NexmarkPerf perf = nexmarkLauncher.run(configuration);
+ if (perf != null) {
+ if (perf.errors == null || perf.errors.size() > 0) {
+ successful = false;
+ }
+ appendPerf(options.getPerfFilename(), configuration, perf);
+ actual.put(configuration, perf);
+ // Summarize what we've run so far.
+ saveSummary(null, configurations, actual, baseline, start);
+ }
+ }
+ } finally {
+ if (options.getMonitorJobs()) {
+ // Report overall performance.
+ saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
+ saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
+ }
+ }
+
+ if (!successful) {
+ throw new RuntimeException("Execution was not successful");
+ }
+ }
+
+ /**
+ * Append the pair of {@code configuration} and {@code perf} to perf file.
+ */
+ private void appendPerf(
+ @Nullable String perfFilename, NexmarkConfiguration configuration,
+ NexmarkPerf perf) {
+ if (perfFilename == null) {
+ return;
+ }
+ List<String> lines = new ArrayList<>();
+ lines.add("");
+ lines.add(String.format("# %s", Instant.now()));
+ lines.add(String.format("# %s", configuration.toShortString()));
+ lines.add(configuration.toString());
+ lines.add(perf.toString());
+ try {
+ Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
+ StandardOpenOption.APPEND);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to write perf file: ", e);
+ }
+ NexmarkUtils.console("appended results to perf file %s.", perfFilename);
+ }
+
+ /**
+ * Load the baseline perf.
+ */
+ @Nullable
+ private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
+ @Nullable String baselineFilename) {
+ if (baselineFilename == null) {
+ return null;
+ }
+ Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
+ List<String> lines;
+ try {
+ lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to read baseline perf file: ", e);
+ }
+ for (int i = 0; i < lines.size(); i++) {
+ if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
+ continue;
+ }
+ NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
+ NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
+ baseline.put(configuration, perf);
+ }
+ NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
+ baselineFilename);
+ return baseline;
+ }
+
+ private static final String LINE =
+ "==========================================================================================";
+
+ /**
+ * Print summary of {@code actual} vs (if non-null) {@code baseline}.
+ */
+ private static void saveSummary(
+ @Nullable String summaryFilename,
+ Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+ @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+ List<String> lines = new ArrayList<>();
+
+ lines.add("");
+ lines.add(LINE);
+
+ lines.add(
+ String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+ lines.add("");
+
+ lines.add("Default configuration:");
+ lines.add(NexmarkConfiguration.DEFAULT.toString());
+ lines.add("");
+
+ lines.add("Configurations:");
+ lines.add(" Conf Description");
+ int conf = 0;
+ for (NexmarkConfiguration configuration : configurations) {
+ lines.add(String.format(" %04d %s", conf++, configuration.toShortString()));
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf != null && actualPerf.jobId != null) {
+ lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId));
+ }
+ }
+
+ lines.add("");
+ lines.add("Performance:");
+ lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)",
+ "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
+ conf = 0;
+ for (NexmarkConfiguration configuration : configurations) {
+ String line = String.format(" %04d ", conf++);
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf == null) {
+ line += "*** not run ***";
+ } else {
+ NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+ double runtimeSec = actualPerf.runtimeSec;
+ line += String.format("%12.1f ", runtimeSec);
+ if (baselinePerf == null) {
+ line += String.format("%12s ", "");
+ } else {
+ double baselineRuntimeSec = baselinePerf.runtimeSec;
+ double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
+ line += String.format("%+11.2f%% ", diff);
+ }
+
+ double eventsPerSec = actualPerf.eventsPerSec;
+ line += String.format("%12.1f ", eventsPerSec);
+ if (baselinePerf == null) {
+ line += String.format("%12s ", "");
+ } else {
+ double baselineEventsPerSec = baselinePerf.eventsPerSec;
+ double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
+ line += String.format("%+11.2f%% ", diff);
+ }
+
+ long numResults = actualPerf.numResults;
+ line += String.format("%12d ", numResults);
+ if (baselinePerf == null) {
+ line += String.format("%12s", "");
+ } else {
+ long baselineNumResults = baselinePerf.numResults;
+ long diff = numResults - baselineNumResults;
+ line += String.format("%+12d", diff);
+ }
+ }
+ lines.add(line);
+
+ if (actualPerf != null) {
+ List<String> errors = actualPerf.errors;
+ if (errors == null) {
+ errors = new ArrayList<>();
+ errors.add("NexmarkGoogleRunner returned null errors list");
+ }
+ for (String error : errors) {
+ lines.add(String.format(" %4s *** %s ***", "", error));
+ }
+ }
+ }
+
+ lines.add(LINE);
+ lines.add("");
+
+ for (String line : lines) {
+ System.out.println(line);
+ }
+
+ if (summaryFilename != null) {
+ try {
+ Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE, StandardOpenOption.APPEND);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to save summary file: ", e);
+ }
+ NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
+ }
+ }
+
+ /**
+ * Write all perf data and any baselines to a javascript file which can be used by
+ * graphing page etc.
+ */
+ private static void saveJavascript(
+ @Nullable String javascriptFilename,
+ Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
+ @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
+ if (javascriptFilename == null) {
+ return;
+ }
+
+ List<String> lines = new ArrayList<>();
+ lines.add(String.format(
+ "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
+ lines.add("var all = [");
+
+ for (NexmarkConfiguration configuration : configurations) {
+ lines.add(" {");
+ lines.add(String.format(" config: %s", configuration));
+ NexmarkPerf actualPerf = actual.get(configuration);
+ if (actualPerf != null) {
+ lines.add(String.format(" ,perf: %s", actualPerf));
+ }
+ NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
+ if (baselinePerf != null) {
+ lines.add(String.format(" ,baseline: %s", baselinePerf));
+ }
+ lines.add(" },");
+ }
+
+ lines.add("];");
+
+ try {
+ Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
+ StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to save javascript file: ", e);
+ }
+ NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
+ }
+
+ public static void main(String[] args) {
+ NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
+ .withValidation()
+ .as(NexmarkOptions.class);
+ NexmarkLauncher<NexmarkOptions> nexmarkLauncher = new NexmarkLauncher<>(options);
+ new Main<>().runAll(options, nexmarkLauncher);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
new file mode 100644
index 0000000..f45c387
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Monitor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A monitor of elements with support for later retrieving their metrics.
+ *
+ * @param <T> Type of element we are monitoring.
+ */
+public class Monitor<T extends KnownSize> implements Serializable {
+ private class MonitorDoFn extends DoFn<T, T> {
+ final Counter elementCounter =
+ Metrics.counter(name , prefix + ".elements");
+ final Counter bytesCounter =
+ Metrics.counter(name , prefix + ".bytes");
+ final Distribution startTime =
+ Metrics.distribution(name , prefix + ".startTime");
+ final Distribution endTime =
+ Metrics.distribution(name , prefix + ".endTime");
+ final Distribution startTimestamp =
+ Metrics.distribution(name , prefix + ".startTimestamp");
+ final Distribution endTimestamp =
+ Metrics.distribution(name , prefix + ".endTimestamp");
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ elementCounter.inc();
+ bytesCounter.inc(c.element().sizeInBytes());
+ long now = System.currentTimeMillis();
+ startTime.update(now);
+ endTime.update(now);
+ startTimestamp.update(c.timestamp().getMillis());
+ endTimestamp.update(c.timestamp().getMillis());
+ c.output(c.element());
+ }
+ }
+
+ public final String name;
+ public final String prefix;
+ private final MonitorDoFn doFn;
+ private final PTransform<PCollection<? extends T>, PCollection<T>> transform;
+
+ public Monitor(String name, String prefix) {
+ this.name = name;
+ this.prefix = prefix;
+ doFn = new MonitorDoFn();
+ transform = ParDo.of(doFn);
+ }
+
+ public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() {
+ return transform;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
new file mode 100644
index 0000000..904fcd5
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkConfiguration.java
@@ -0,0 +1,721 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Configuration controlling how a query is run. May be supplied by command line or
+ * programmatically. We only capture properties which may influence the resulting
+ * pipeline performance, as captured by {@link NexmarkPerf}.
+ */
+public class NexmarkConfiguration implements Serializable {
+ public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration();
+
+ /** If {@literal true}, include additional debugging and monitoring stats. */
+ @JsonProperty
+ public boolean debug = true;
+
+ /** Which query to run, in [0,9]. */
+ @JsonProperty
+ public int query = 0;
+
+ /** Where events come from. */
+ @JsonProperty
+ public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT;
+
+ /** Where results go to. */
+ @JsonProperty
+ public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL;
+
+ /**
+ * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated
+ * into the overall query pipeline.
+ */
+ @JsonProperty
+ public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED;
+
+ /**
+ * Number of events to generate. If zero, generate as many as possible without overflowing
+ * internal counters etc.
+ */
+ @JsonProperty
+ public long numEvents = 100000;
+
+ /**
+ * Number of event generators to use. Each generates events in its own timeline.
+ */
+ @JsonProperty
+ public int numEventGenerators = 100;
+
+ /**
+ * Shape of event rate curve.
+ */
+ @JsonProperty
+ public NexmarkUtils.RateShape rateShape = NexmarkUtils.RateShape.SINE;
+
+ /**
+ * Initial overall event rate (in {@link #rateUnit}).
+ */
+ @JsonProperty
+ public int firstEventRate = 10000;
+
+ /**
+ * Next overall event rate (in {@link #rateUnit}).
+ */
+ @JsonProperty
+ public int nextEventRate = 10000;
+
+ /**
+ * Unit for rates.
+ */
+ @JsonProperty
+ public NexmarkUtils.RateUnit rateUnit = NexmarkUtils.RateUnit.PER_SECOND;
+
+ /**
+ * Overall period of rate shape, in seconds.
+ */
+ @JsonProperty
+ public int ratePeriodSec = 600;
+
+ /**
+ * Time in seconds to preload the subscription with data, at the initial input rate of the
+ * pipeline.
+ */
+ @JsonProperty
+ public int preloadSeconds = 0;
+
+ /**
+ * Timeout for stream pipelines to stop in seconds.
+ */
+ @JsonProperty
+ public int streamTimeout = 240;
+
+ /**
+ * If true, and in streaming mode, generate events only when they are due according to their
+ * timestamp.
+ */
+ @JsonProperty
+ public boolean isRateLimited = false;
+
+ /**
+ * If true, use wallclock time as event time. Otherwise, use a deterministic
+ * time in the past so that multiple runs will see exactly the same event streams
+ * and should thus have exactly the same results.
+ */
+ @JsonProperty
+ public boolean useWallclockEventTime = false;
+
+ /** Average idealized size of a 'new person' event, in bytes. */
+ @JsonProperty
+ public int avgPersonByteSize = 200;
+
+ /** Average idealized size of a 'new auction' event, in bytes. */
+ @JsonProperty
+ public int avgAuctionByteSize = 500;
+
+ /** Average idealized size of a 'bid' event, in bytes. */
+ @JsonProperty
+ public int avgBidByteSize = 100;
+
+ /** Ratio of bids to 'hot' auctions compared to all other auctions. */
+ @JsonProperty
+ public int hotAuctionRatio = 2;
+
+ /** Ratio of auctions for 'hot' sellers compared to all other people. */
+ @JsonProperty
+ public int hotSellersRatio = 4;
+
+ /** Ratio of bids for 'hot' bidders compared to all other people. */
+ @JsonProperty
+ public int hotBiddersRatio = 4;
+
+ /** Window size, in seconds, for queries 3, 5, 7 and 8. */
+ @JsonProperty
+ public long windowSizeSec = 10;
+
+ /** Sliding window period, in seconds, for query 5. */
+ @JsonProperty
+ public long windowPeriodSec = 5;
+
+ /** Number of seconds to hold back events according to their reported timestamp. */
+ @JsonProperty
+ public long watermarkHoldbackSec = 0;
+
+ /** Average number of auction which should be inflight at any time, per generator. */
+ @JsonProperty
+ public int numInFlightAuctions = 100;
+
+ /** Maximum number of people to consider as active for placing auctions or bids. */
+ @JsonProperty
+ public int numActivePeople = 1000;
+
+ /** Coder strategy to follow. */
+ @JsonProperty
+ public NexmarkUtils.CoderStrategy coderStrategy = NexmarkUtils.CoderStrategy.HAND;
+
+ /**
+ * Delay, in milliseconds, for each event. This will peg one core for this number
+ * of milliseconds to simulate CPU-bound computation.
+ */
+ @JsonProperty
+ public long cpuDelayMs = 0;
+
+ /**
+ * Extra data, in bytes, to save to persistent state for each event. This will force
+ * i/o all the way to durable storage to simulate an I/O-bound computation.
+ */
+ @JsonProperty
+ public long diskBusyBytes = 0;
+
+ /**
+ * Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction.
+ */
+ @JsonProperty
+ public int auctionSkip = 123;
+
+ /**
+ * Fanout for queries 4 (groups by category id), 5 and 7 (find a global maximum).
+ */
+ @JsonProperty
+ public int fanout = 5;
+
+ /**
+ * Maximum waiting time to clean personState in query3
+ * (ie maximum waiting of the auctions related to person in state in seconds in event time).
+ */
+ @JsonProperty
+ public int maxAuctionsWaitingTime = 600;
+
+ /**
+ * Length of occasional delay to impose on events (in seconds).
+ */
+ @JsonProperty
+ public long occasionalDelaySec = 3;
+
+ /**
+ * Probability that an event will be delayed by delayS.
+ */
+ @JsonProperty
+ public double probDelayedEvent = 0.1;
+
+ /**
+ * Maximum size of each log file (in events). For Query10 only.
+ */
+ @JsonProperty
+ public int maxLogEvents = 100_000;
+
+ /**
+ * If true, use pub/sub publish time instead of event time.
+ */
+ @JsonProperty
+ public boolean usePubsubPublishTime = false;
+
+ /**
+ * Number of events in out-of-order groups. 1 implies no out-of-order events. 1000 implies
+ * every 1000 events per generator are emitted in pseudo-random order.
+ */
+ @JsonProperty
+ public long outOfOrderGroupSize = 1;
+
+ /**
+ * Replace any properties of this configuration which have been supplied by the command line.
+ */
+ public void overrideFromOptions(NexmarkOptions options) {
+ if (options.getDebug() != null) {
+ debug = options.getDebug();
+ }
+ if (options.getQuery() != null) {
+ query = options.getQuery();
+ }
+ if (options.getSourceType() != null) {
+ sourceType = options.getSourceType();
+ }
+ if (options.getSinkType() != null) {
+ sinkType = options.getSinkType();
+ }
+ if (options.getPubSubMode() != null) {
+ pubSubMode = options.getPubSubMode();
+ }
+ if (options.getNumEvents() != null) {
+ numEvents = options.getNumEvents();
+ }
+ if (options.getNumEventGenerators() != null) {
+ numEventGenerators = options.getNumEventGenerators();
+ }
+ if (options.getRateShape() != null) {
+ rateShape = options.getRateShape();
+ }
+ if (options.getFirstEventRate() != null) {
+ firstEventRate = options.getFirstEventRate();
+ }
+ if (options.getNextEventRate() != null) {
+ nextEventRate = options.getNextEventRate();
+ }
+ if (options.getRateUnit() != null) {
+ rateUnit = options.getRateUnit();
+ }
+ if (options.getRatePeriodSec() != null) {
+ ratePeriodSec = options.getRatePeriodSec();
+ }
+ if (options.getPreloadSeconds() != null) {
+ preloadSeconds = options.getPreloadSeconds();
+ }
+ if (options.getStreamTimeout() != null) {
+ streamTimeout = options.getStreamTimeout();
+ }
+ if (options.getIsRateLimited() != null) {
+ isRateLimited = options.getIsRateLimited();
+ }
+ if (options.getUseWallclockEventTime() != null) {
+ useWallclockEventTime = options.getUseWallclockEventTime();
+ }
+ if (options.getAvgPersonByteSize() != null) {
+ avgPersonByteSize = options.getAvgPersonByteSize();
+ }
+ if (options.getAvgAuctionByteSize() != null) {
+ avgAuctionByteSize = options.getAvgAuctionByteSize();
+ }
+ if (options.getAvgBidByteSize() != null) {
+ avgBidByteSize = options.getAvgBidByteSize();
+ }
+ if (options.getHotAuctionRatio() != null) {
+ hotAuctionRatio = options.getHotAuctionRatio();
+ }
+ if (options.getHotSellersRatio() != null) {
+ hotSellersRatio = options.getHotSellersRatio();
+ }
+ if (options.getHotBiddersRatio() != null) {
+ hotBiddersRatio = options.getHotBiddersRatio();
+ }
+ if (options.getWindowSizeSec() != null) {
+ windowSizeSec = options.getWindowSizeSec();
+ }
+ if (options.getWindowPeriodSec() != null) {
+ windowPeriodSec = options.getWindowPeriodSec();
+ }
+ if (options.getWatermarkHoldbackSec() != null) {
+ watermarkHoldbackSec = options.getWatermarkHoldbackSec();
+ }
+ if (options.getNumInFlightAuctions() != null) {
+ numInFlightAuctions = options.getNumInFlightAuctions();
+ }
+ if (options.getNumActivePeople() != null) {
+ numActivePeople = options.getNumActivePeople();
+ }
+ if (options.getCoderStrategy() != null) {
+ coderStrategy = options.getCoderStrategy();
+ }
+ if (options.getCpuDelayMs() != null) {
+ cpuDelayMs = options.getCpuDelayMs();
+ }
+ if (options.getDiskBusyBytes() != null) {
+ diskBusyBytes = options.getDiskBusyBytes();
+ }
+ if (options.getAuctionSkip() != null) {
+ auctionSkip = options.getAuctionSkip();
+ }
+ if (options.getFanout() != null) {
+ fanout = options.getFanout();
+ }
+ if (options.getMaxAuctionsWaitingTime() != null) {
+ fanout = options.getMaxAuctionsWaitingTime();
+ }
+ if (options.getOccasionalDelaySec() != null) {
+ occasionalDelaySec = options.getOccasionalDelaySec();
+ }
+ if (options.getProbDelayedEvent() != null) {
+ probDelayedEvent = options.getProbDelayedEvent();
+ }
+ if (options.getMaxLogEvents() != null) {
+ maxLogEvents = options.getMaxLogEvents();
+ }
+ if (options.getUsePubsubPublishTime() != null) {
+ usePubsubPublishTime = options.getUsePubsubPublishTime();
+ }
+ if (options.getOutOfOrderGroupSize() != null) {
+ outOfOrderGroupSize = options.getOutOfOrderGroupSize();
+ }
+ }
+
+ /**
+ * Return copy of configuration with given label.
+ */
+ public NexmarkConfiguration copy() {
+ NexmarkConfiguration result;
+ result = new NexmarkConfiguration();
+ result.debug = debug;
+ result.query = query;
+ result.sourceType = sourceType;
+ result.sinkType = sinkType;
+ result.pubSubMode = pubSubMode;
+ result.numEvents = numEvents;
+ result.numEventGenerators = numEventGenerators;
+ result.rateShape = rateShape;
+ result.firstEventRate = firstEventRate;
+ result.nextEventRate = nextEventRate;
+ result.rateUnit = rateUnit;
+ result.ratePeriodSec = ratePeriodSec;
+ result.preloadSeconds = preloadSeconds;
+ result.streamTimeout = streamTimeout;
+ result.isRateLimited = isRateLimited;
+ result.useWallclockEventTime = useWallclockEventTime;
+ result.avgPersonByteSize = avgPersonByteSize;
+ result.avgAuctionByteSize = avgAuctionByteSize;
+ result.avgBidByteSize = avgBidByteSize;
+ result.hotAuctionRatio = hotAuctionRatio;
+ result.hotSellersRatio = hotSellersRatio;
+ result.hotBiddersRatio = hotBiddersRatio;
+ result.windowSizeSec = windowSizeSec;
+ result.windowPeriodSec = windowPeriodSec;
+ result.watermarkHoldbackSec = watermarkHoldbackSec;
+ result.numInFlightAuctions = numInFlightAuctions;
+ result.numActivePeople = numActivePeople;
+ result.coderStrategy = coderStrategy;
+ result.cpuDelayMs = cpuDelayMs;
+ result.diskBusyBytes = diskBusyBytes;
+ result.auctionSkip = auctionSkip;
+ result.fanout = fanout;
+ result.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
+ result.occasionalDelaySec = occasionalDelaySec;
+ result.probDelayedEvent = probDelayedEvent;
+ result.maxLogEvents = maxLogEvents;
+ result.usePubsubPublishTime = usePubsubPublishTime;
+ result.outOfOrderGroupSize = outOfOrderGroupSize;
+ return result;
+ }
+
+ /**
+ * Return short description of configuration (suitable for use in logging). We only render
+ * the core fields plus those which do not have default values.
+ */
+ public String toShortString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("query:%d", query));
+ if (debug != DEFAULT.debug) {
+ sb.append(String.format("; debug:%s", debug));
+ }
+ if (sourceType != DEFAULT.sourceType) {
+ sb.append(String.format("; sourceType:%s", sourceType));
+ }
+ if (sinkType != DEFAULT.sinkType) {
+ sb.append(String.format("; sinkType:%s", sinkType));
+ }
+ if (pubSubMode != DEFAULT.pubSubMode) {
+ sb.append(String.format("; pubSubMode:%s", pubSubMode));
+ }
+ if (numEvents != DEFAULT.numEvents) {
+ sb.append(String.format("; numEvents:%d", numEvents));
+ }
+ if (numEventGenerators != DEFAULT.numEventGenerators) {
+ sb.append(String.format("; numEventGenerators:%d", numEventGenerators));
+ }
+ if (rateShape != DEFAULT.rateShape) {
+ sb.append(String.format("; rateShape:%s", rateShape));
+ }
+ if (firstEventRate != DEFAULT.firstEventRate || nextEventRate != DEFAULT.nextEventRate) {
+ sb.append(String.format("; firstEventRate:%d", firstEventRate));
+ sb.append(String.format("; nextEventRate:%d", nextEventRate));
+ }
+ if (rateUnit != DEFAULT.rateUnit) {
+ sb.append(String.format("; rateUnit:%s", rateUnit));
+ }
+ if (ratePeriodSec != DEFAULT.ratePeriodSec) {
+ sb.append(String.format("; ratePeriodSec:%d", ratePeriodSec));
+ }
+ if (preloadSeconds != DEFAULT.preloadSeconds) {
+ sb.append(String.format("; preloadSeconds:%d", preloadSeconds));
+ }
+ if (streamTimeout != DEFAULT.streamTimeout) {
+ sb.append(String.format("; streamTimeout:%d", streamTimeout));
+ }
+ if (isRateLimited != DEFAULT.isRateLimited) {
+ sb.append(String.format("; isRateLimited:%s", isRateLimited));
+ }
+ if (useWallclockEventTime != DEFAULT.useWallclockEventTime) {
+ sb.append(String.format("; useWallclockEventTime:%s", useWallclockEventTime));
+ }
+ if (avgPersonByteSize != DEFAULT.avgPersonByteSize) {
+ sb.append(String.format("; avgPersonByteSize:%d", avgPersonByteSize));
+ }
+ if (avgAuctionByteSize != DEFAULT.avgAuctionByteSize) {
+ sb.append(String.format("; avgAuctionByteSize:%d", avgAuctionByteSize));
+ }
+ if (avgBidByteSize != DEFAULT.avgBidByteSize) {
+ sb.append(String.format("; avgBidByteSize:%d", avgBidByteSize));
+ }
+ if (hotAuctionRatio != DEFAULT.hotAuctionRatio) {
+ sb.append(String.format("; hotAuctionRatio:%d", hotAuctionRatio));
+ }
+ if (hotSellersRatio != DEFAULT.hotSellersRatio) {
+ sb.append(String.format("; hotSellersRatio:%d", hotSellersRatio));
+ }
+ if (hotBiddersRatio != DEFAULT.hotBiddersRatio) {
+ sb.append(String.format("; hotBiddersRatio:%d", hotBiddersRatio));
+ }
+ if (windowSizeSec != DEFAULT.windowSizeSec) {
+ sb.append(String.format("; windowSizeSec:%d", windowSizeSec));
+ }
+ if (windowPeriodSec != DEFAULT.windowPeriodSec) {
+ sb.append(String.format("; windowPeriodSec:%d", windowPeriodSec));
+ }
+ if (watermarkHoldbackSec != DEFAULT.watermarkHoldbackSec) {
+ sb.append(String.format("; watermarkHoldbackSec:%d", watermarkHoldbackSec));
+ }
+ if (numInFlightAuctions != DEFAULT.numInFlightAuctions) {
+ sb.append(String.format("; numInFlightAuctions:%d", numInFlightAuctions));
+ }
+ if (numActivePeople != DEFAULT.numActivePeople) {
+ sb.append(String.format("; numActivePeople:%d", numActivePeople));
+ }
+ if (coderStrategy != DEFAULT.coderStrategy) {
+ sb.append(String.format("; coderStrategy:%s", coderStrategy));
+ }
+ if (cpuDelayMs != DEFAULT.cpuDelayMs) {
+ sb.append(String.format("; cpuSlowdownMs:%d", cpuDelayMs));
+ }
+ if (diskBusyBytes != DEFAULT.diskBusyBytes) {
+ sb.append(String.format("; diskBuysBytes:%d", diskBusyBytes));
+ }
+ if (auctionSkip != DEFAULT.auctionSkip) {
+ sb.append(String.format("; auctionSkip:%d", auctionSkip));
+ }
+ if (fanout != DEFAULT.fanout) {
+ sb.append(String.format("; fanout:%d", fanout));
+ }
+ if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) {
+ sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout));
+ }
+ if (occasionalDelaySec != DEFAULT.occasionalDelaySec) {
+ sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec));
+ }
+ if (probDelayedEvent != DEFAULT.probDelayedEvent) {
+ sb.append(String.format("; probDelayedEvent:%f", probDelayedEvent));
+ }
+ if (maxLogEvents != DEFAULT.maxLogEvents) {
+ sb.append(String.format("; maxLogEvents:%d", maxLogEvents));
+ }
+ if (usePubsubPublishTime != DEFAULT.usePubsubPublishTime) {
+ sb.append(String.format("; usePubsubPublishTime:%s", usePubsubPublishTime));
+ }
+ if (outOfOrderGroupSize != DEFAULT.outOfOrderGroupSize) {
+ sb.append(String.format("; outOfOrderGroupSize:%d", outOfOrderGroupSize));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Return full description as a string.
+ */
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Parse an object from {@code string}.
+ */
+ public static NexmarkConfiguration fromString(String string) {
+ try {
+ return NexmarkUtils.MAPPER.readValue(string, NexmarkConfiguration.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to parse nexmark configuration: ", e);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ debug,
+ query,
+ sourceType,
+ sinkType,
+ pubSubMode,
+ numEvents,
+ numEventGenerators,
+ rateShape,
+ firstEventRate,
+ nextEventRate,
+ rateUnit,
+ ratePeriodSec,
+ preloadSeconds,
+ streamTimeout,
+ isRateLimited,
+ useWallclockEventTime,
+ avgPersonByteSize,
+ avgAuctionByteSize,
+ avgBidByteSize,
+ hotAuctionRatio,
+ hotSellersRatio,
+ hotBiddersRatio,
+ windowSizeSec,
+ windowPeriodSec,
+ watermarkHoldbackSec,
+ numInFlightAuctions,
+ numActivePeople,
+ coderStrategy,
+ cpuDelayMs,
+ diskBusyBytes,
+ auctionSkip,
+ fanout,
+ maxAuctionsWaitingTime,
+ occasionalDelaySec,
+ probDelayedEvent,
+ maxLogEvents,
+ usePubsubPublishTime,
+ outOfOrderGroupSize);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ NexmarkConfiguration other = (NexmarkConfiguration) obj;
+ if (debug != other.debug) {
+ return false;
+ }
+ if (auctionSkip != other.auctionSkip) {
+ return false;
+ }
+ if (avgAuctionByteSize != other.avgAuctionByteSize) {
+ return false;
+ }
+ if (avgBidByteSize != other.avgBidByteSize) {
+ return false;
+ }
+ if (avgPersonByteSize != other.avgPersonByteSize) {
+ return false;
+ }
+ if (coderStrategy != other.coderStrategy) {
+ return false;
+ }
+ if (cpuDelayMs != other.cpuDelayMs) {
+ return false;
+ }
+ if (diskBusyBytes != other.diskBusyBytes) {
+ return false;
+ }
+ if (fanout != other.fanout) {
+ return false;
+ }
+ if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) {
+ return false;
+ }
+ if (firstEventRate != other.firstEventRate) {
+ return false;
+ }
+ if (hotAuctionRatio != other.hotAuctionRatio) {
+ return false;
+ }
+ if (hotBiddersRatio != other.hotBiddersRatio) {
+ return false;
+ }
+ if (hotSellersRatio != other.hotSellersRatio) {
+ return false;
+ }
+ if (isRateLimited != other.isRateLimited) {
+ return false;
+ }
+ if (maxLogEvents != other.maxLogEvents) {
+ return false;
+ }
+ if (nextEventRate != other.nextEventRate) {
+ return false;
+ }
+ if (rateUnit != other.rateUnit) {
+ return false;
+ }
+ if (numEventGenerators != other.numEventGenerators) {
+ return false;
+ }
+ if (numEvents != other.numEvents) {
+ return false;
+ }
+ if (numInFlightAuctions != other.numInFlightAuctions) {
+ return false;
+ }
+ if (numActivePeople != other.numActivePeople) {
+ return false;
+ }
+ if (occasionalDelaySec != other.occasionalDelaySec) {
+ return false;
+ }
+ if (preloadSeconds != other.preloadSeconds) {
+ return false;
+ }
+ if (streamTimeout != other.streamTimeout) {
+ return false;
+ }
+ if (Double.doubleToLongBits(probDelayedEvent)
+ != Double.doubleToLongBits(other.probDelayedEvent)) {
+ return false;
+ }
+ if (pubSubMode != other.pubSubMode) {
+ return false;
+ }
+ if (ratePeriodSec != other.ratePeriodSec) {
+ return false;
+ }
+ if (rateShape != other.rateShape) {
+ return false;
+ }
+ if (query != other.query) {
+ return false;
+ }
+ if (sinkType != other.sinkType) {
+ return false;
+ }
+ if (sourceType != other.sourceType) {
+ return false;
+ }
+ if (useWallclockEventTime != other.useWallclockEventTime) {
+ return false;
+ }
+ if (watermarkHoldbackSec != other.watermarkHoldbackSec) {
+ return false;
+ }
+ if (windowPeriodSec != other.windowPeriodSec) {
+ return false;
+ }
+ if (windowSizeSec != other.windowSizeSec) {
+ return false;
+ }
+ if (usePubsubPublishTime != other.usePubsubPublishTime) {
+ return false;
+ }
+ if (outOfOrderGroupSize != other.outOfOrderGroupSize) {
+ return false;
+ }
+ return true;
+ }
+}