You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:10:00 UTC
[52/55] [abbrv] beam git commit: Move module
beam-integration-java-nexmark to beam-sdks-java-nexmark
Move module beam-integration-java-nexmark to beam-sdks-java-nexmark
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4333df7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4333df7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4333df7
Branch: refs/heads/master
Commit: f4333df77267d5207f0f23ae62e79b171a00e8a7
Parents: 2f9b494
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Thu Jun 15 11:55:26 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:29 2017 +0200
----------------------------------------------------------------------
integration/java/nexmark/README.md | 340 -----
integration/java/nexmark/pom.xml | 292 -----
.../apache/beam/integration/nexmark/Main.java | 304 -----
.../beam/integration/nexmark/Monitor.java | 79 --
.../nexmark/NexmarkConfiguration.java | 721 -----------
.../integration/nexmark/NexmarkLauncher.java | 1158 ------------------
.../integration/nexmark/NexmarkOptions.java | 403 ------
.../beam/integration/nexmark/NexmarkPerf.java | 208 ----
.../beam/integration/nexmark/NexmarkSuite.java | 112 --
.../beam/integration/nexmark/NexmarkUtils.java | 672 ----------
.../beam/integration/nexmark/model/Auction.java | 187 ---
.../integration/nexmark/model/AuctionBid.java | 84 --
.../integration/nexmark/model/AuctionCount.java | 84 --
.../integration/nexmark/model/AuctionPrice.java | 88 --
.../beam/integration/nexmark/model/Bid.java | 177 ---
.../nexmark/model/BidsPerSession.java | 87 --
.../nexmark/model/CategoryPrice.java | 97 --
.../beam/integration/nexmark/model/Done.java | 80 --
.../beam/integration/nexmark/model/Event.java | 171 ---
.../nexmark/model/IdNameReserve.java | 98 --
.../integration/nexmark/model/KnownSize.java | 26 -
.../nexmark/model/NameCityStateId.java | 103 --
.../beam/integration/nexmark/model/Person.java | 163 ---
.../integration/nexmark/model/SellerPrice.java | 89 --
.../integration/nexmark/model/package-info.java | 22 -
.../beam/integration/nexmark/package-info.java | 21 -
.../nexmark/queries/AbstractSimulator.java | 211 ----
.../nexmark/queries/NexmarkQuery.java | 270 ----
.../nexmark/queries/NexmarkQueryModel.java | 118 --
.../integration/nexmark/queries/Query0.java | 71 --
.../nexmark/queries/Query0Model.java | 64 -
.../integration/nexmark/queries/Query1.java | 67 -
.../integration/nexmark/queries/Query10.java | 367 ------
.../integration/nexmark/queries/Query11.java | 79 --
.../integration/nexmark/queries/Query12.java | 80 --
.../nexmark/queries/Query1Model.java | 76 --
.../integration/nexmark/queries/Query2.java | 79 --
.../nexmark/queries/Query2Model.java | 80 --
.../integration/nexmark/queries/Query3.java | 301 -----
.../nexmark/queries/Query3Model.java | 124 --
.../integration/nexmark/queries/Query4.java | 116 --
.../nexmark/queries/Query4Model.java | 186 ---
.../integration/nexmark/queries/Query5.java | 138 ---
.../nexmark/queries/Query5Model.java | 176 ---
.../integration/nexmark/queries/Query6.java | 155 ---
.../nexmark/queries/Query6Model.java | 133 --
.../integration/nexmark/queries/Query7.java | 90 --
.../nexmark/queries/Query7Model.java | 130 --
.../integration/nexmark/queries/Query8.java | 97 --
.../nexmark/queries/Query8Model.java | 148 ---
.../integration/nexmark/queries/Query9.java | 44 -
.../nexmark/queries/Query9Model.java | 44 -
.../nexmark/queries/WinningBids.java | 412 -------
.../nexmark/queries/WinningBidsSimulator.java | 206 ----
.../nexmark/queries/package-info.java | 22 -
.../nexmark/sources/BoundedEventSource.java | 190 ---
.../integration/nexmark/sources/Generator.java | 609 ---------
.../nexmark/sources/GeneratorConfig.java | 301 -----
.../nexmark/sources/UnboundedEventSource.java | 330 -----
.../nexmark/sources/package-info.java | 22 -
.../nexmark/src/main/resources/log4j.properties | 55 -
.../integration/nexmark/queries/QueryTest.java | 185 ---
.../nexmark/sources/BoundedEventSourceTest.java | 71 --
.../nexmark/sources/GeneratorTest.java | 111 --
.../sources/UnboundedEventSourceTest.java | 107 --
integration/java/pom.xml | 37 -
integration/pom.xml | 51 -
pom.xml | 1 -
sdks/java/nexmark/README.md | 340 +++++
sdks/java/nexmark/pom.xml | 292 +++++
.../java/org/apache/beam/sdk/nexmark/Main.java | 303 +++++
.../org/apache/beam/sdk/nexmark/Monitor.java | 78 ++
.../beam/sdk/nexmark/NexmarkConfiguration.java | 721 +++++++++++
.../beam/sdk/nexmark/NexmarkLauncher.java | 1157 +++++++++++++++++
.../apache/beam/sdk/nexmark/NexmarkOptions.java | 403 ++++++
.../apache/beam/sdk/nexmark/NexmarkPerf.java | 207 ++++
.../apache/beam/sdk/nexmark/NexmarkSuite.java | 112 ++
.../apache/beam/sdk/nexmark/NexmarkUtils.java | 674 ++++++++++
.../apache/beam/sdk/nexmark/model/Auction.java | 187 +++
.../beam/sdk/nexmark/model/AuctionBid.java | 85 ++
.../beam/sdk/nexmark/model/AuctionCount.java | 84 ++
.../beam/sdk/nexmark/model/AuctionPrice.java | 88 ++
.../org/apache/beam/sdk/nexmark/model/Bid.java | 177 +++
.../beam/sdk/nexmark/model/BidsPerSession.java | 87 ++
.../beam/sdk/nexmark/model/CategoryPrice.java | 97 ++
.../org/apache/beam/sdk/nexmark/model/Done.java | 80 ++
.../apache/beam/sdk/nexmark/model/Event.java | 171 +++
.../beam/sdk/nexmark/model/IdNameReserve.java | 98 ++
.../beam/sdk/nexmark/model/KnownSize.java | 26 +
.../beam/sdk/nexmark/model/NameCityStateId.java | 103 ++
.../apache/beam/sdk/nexmark/model/Person.java | 163 +++
.../beam/sdk/nexmark/model/SellerPrice.java | 89 ++
.../beam/sdk/nexmark/model/package-info.java | 22 +
.../apache/beam/sdk/nexmark/package-info.java | 21 +
.../sdk/nexmark/queries/AbstractSimulator.java | 211 ++++
.../beam/sdk/nexmark/queries/NexmarkQuery.java | 270 ++++
.../sdk/nexmark/queries/NexmarkQueryModel.java | 118 ++
.../apache/beam/sdk/nexmark/queries/Query0.java | 70 ++
.../beam/sdk/nexmark/queries/Query0Model.java | 64 +
.../apache/beam/sdk/nexmark/queries/Query1.java | 67 +
.../beam/sdk/nexmark/queries/Query10.java | 367 ++++++
.../beam/sdk/nexmark/queries/Query11.java | 79 ++
.../beam/sdk/nexmark/queries/Query12.java | 80 ++
.../beam/sdk/nexmark/queries/Query1Model.java | 76 ++
.../apache/beam/sdk/nexmark/queries/Query2.java | 79 ++
.../beam/sdk/nexmark/queries/Query2Model.java | 80 ++
.../apache/beam/sdk/nexmark/queries/Query3.java | 301 +++++
.../beam/sdk/nexmark/queries/Query3Model.java | 124 ++
.../apache/beam/sdk/nexmark/queries/Query4.java | 116 ++
.../beam/sdk/nexmark/queries/Query4Model.java | 186 +++
.../apache/beam/sdk/nexmark/queries/Query5.java | 138 +++
.../beam/sdk/nexmark/queries/Query5Model.java | 176 +++
.../apache/beam/sdk/nexmark/queries/Query6.java | 155 +++
.../beam/sdk/nexmark/queries/Query6Model.java | 133 ++
.../apache/beam/sdk/nexmark/queries/Query7.java | 90 ++
.../beam/sdk/nexmark/queries/Query7Model.java | 130 ++
.../apache/beam/sdk/nexmark/queries/Query8.java | 97 ++
.../beam/sdk/nexmark/queries/Query8Model.java | 148 +++
.../apache/beam/sdk/nexmark/queries/Query9.java | 44 +
.../beam/sdk/nexmark/queries/Query9Model.java | 44 +
.../beam/sdk/nexmark/queries/WinningBids.java | 412 +++++++
.../nexmark/queries/WinningBidsSimulator.java | 206 ++++
.../beam/sdk/nexmark/queries/package-info.java | 22 +
.../sdk/nexmark/sources/BoundedEventSource.java | 190 +++
.../beam/sdk/nexmark/sources/Generator.java | 609 +++++++++
.../sdk/nexmark/sources/GeneratorConfig.java | 298 +++++
.../nexmark/sources/UnboundedEventSource.java | 329 +++++
.../beam/sdk/nexmark/sources/package-info.java | 22 +
.../nexmark/src/main/resources/log4j.properties | 55 +
.../beam/sdk/nexmark/queries/QueryTest.java | 185 +++
.../nexmark/sources/BoundedEventSourceTest.java | 70 ++
.../beam/sdk/nexmark/sources/GeneratorTest.java | 110 ++
.../sources/UnboundedEventSourceTest.java | 105 ++
sdks/java/pom.xml | 1 +
134 files changed, 11922 insertions(+), 12020 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/README.md
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md
deleted file mode 100644
index a9acd63..0000000
--- a/integration/java/nexmark/README.md
+++ /dev/null
@@ -1,340 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-# NEXMark integration suite
-
-This is a suite of pipelines inspired by the 'continuous data stream'
-queries in [http://datalab.cs.pdx.edu/niagaraST/NEXMark/]
-(http://datalab.cs.pdx.edu/niagaraST/NEXMark/).
-
-These are multiple queries over a three entities model representing on online auction system:
-
- - **Person** represents a person submitting an item for auction and/or making a bid
- on an auction.
- - **Auction** represents an item under auction.
- - **Bid** represents a bid for an item under auction.
-
-The queries exercise many aspects of Beam model:
-
-* **Query1**: What are the bid values in Euro's?
- Illustrates a simple map.
-* **Query2**: What are the auctions with particular auction numbers?
- Illustrates a simple filter.
-* **Query3**: Who is selling in particular US states?
- Illustrates an incremental join (using per-key state and timer) and filter.
-* **Query4**: What is the average selling price for each auction
- category?
- Illustrates complex join (using custom window functions) and
- aggregation.
-* **Query5**: Which auctions have seen the most bids in the last period?
- Illustrates sliding windows and combiners.
-* **Query6**: What is the average selling price per seller for their
- last 10 closed auctions.
- Shares the same 'winning bids' core as for **Query4**, and
- illustrates a specialized combiner.
-* **Query7**: What are the highest bids per period?
- Deliberately implemented using a side input to illustrate fanout.
-* **Query8**: Who has entered the system and created an auction in
- the last period?
- Illustrates a simple join.
-
-We have augmented the original queries with five more:
-
-* **Query0**: Pass-through.
- Allows us to measure the monitoring overhead.
-* **Query9**: Winning-bids.
- A common sub-query shared by **Query4** and **Query6**.
-* **Query10**: Log all events to GCS files.
- Illustrates windows with large side effects on firing.
-* **Query11**: How many bids did a user make in each session they
- were active?
- Illustrates session windows.
-* **Query12**: How many bids does a user make within a fixed
- processing time limit?
- Illustrates working in processing time in the Global window, as
- compared with event time in non-Global windows for all the other
- queries.
-
-We can specify the Beam runner to use with maven profiles, available profiles are:
-
-* direct-runner
-* spark-runner
-* flink-runner
-* apex-runner
-
-The runner must also be specified like in any other Beam pipeline using
-
- --runner
-
-
-Test data is deterministically synthesized on demand. The test
-data may be synthesized in the same pipeline as the query itself,
-or may be published to Pubsub.
-
-The query results may be:
-
-* Published to Pubsub.
-* Written to text files as plain text.
-* Written to text files using an Avro encoding.
-* Send to BigQuery.
-* Discarded.
-
-# Configuration
-
-## Common configuration parameters
-
-Decide if batch or streaming:
-
- --streaming=true
-
-Number of events generators
-
- --numEventGenerators=4
-
-Run query N
-
- --query=N
-
-## Available Suites
-The suite to run can be chosen using this configuration parameter:
-
- --suite=SUITE
-
-Available suites are:
-* DEFAULT: Test default configuration with query 0.
-* SMOKE: Run the 12 default configurations.
-* STRESS: Like smoke but for 1m events.
-* FULL_THROTTLE: Like SMOKE but 100m events.
-
-
-
-## Apex specific configuration
-
- --manageResources=false --monitorJobs=false
-
-## Dataflow specific configuration
-
- --manageResources=false --monitorJobs=true \
- --enforceEncodability=false --enforceImmutability=false
- --project=<your project> \
- --zone=<your zone> \
- --workerMachineType=n1-highmem-8 \
- --stagingLocation=<a gs path for staging> \
- --runner=DataflowRunner \
- --tempLocation=gs://talend-imejia/nexmark/temp/ \
- --stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \
- --filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar
-
-## Direct specific configuration
-
- --manageResources=false --monitorJobs=true \
- --enforceEncodability=false --enforceImmutability=false
-
-## Flink specific configuration
-
- --manageResources=false --monitorJobs=true \
- --flinkMaster=local --parallelism=#numcores
-
-## Spark specific configuration
-
- --manageResources=false --monitorJobs=true \
- --sparkMaster=local \
- -Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
-
-# Current Status
-
-Open issues are tracked [here](https://github.com../../../../../issues):
-
-## Batch / Synthetic / Local
-
-| Query | Direct | Spark | Flink | Apex |
-| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
-| 0 | ok | ok | ok | ok |
-| 1 | ok | ok | ok | ok |
-| 2 | ok | ok | ok | ok |
-| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | ok | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
-| 4 | ok | ok | ok | ok |
-| 5 | ok | ok | ok | ok |
-| 6 | ok | ok | ok | ok |
-| 7 | ok | ok | ok | ok |
-| 8 | ok | ok | ok | ok |
-| 9 | ok | ok | ok | ok |
-| 10 | ok | ok | ok | ok |
-| 11 | ok | ok | ok | ok |
-| 12 | ok | ok | ok | ok |
-
-## Streaming / Synthetic / Local
-
-| Query | Direct | Spark | Flink | Apex |
-| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
-| 0 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 1 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 2 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
-| 4 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 5 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 6 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 7 | ok | [BEAM-2112](https://issues.apache.org/jira/browse/BEAM-2112) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 8 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 9 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 10 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 11 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-| 12 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
-
-## Batch / Synthetic / Cluster
-
-TODO
-
-| Query | Dataflow | Spark | Flink | Apex |
-| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
-| 0 | | | | |
-
-## Streaming / Synthetic / Cluster
-
-TODO
-
-| Query | Dataflow | Spark | Flink | Apex |
-| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
-| 0 | | | | |
-
-# Running Nexmark
-
-## Running SMOKE suite on the DirectRunner (local)
-
-Batch Mode
-
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
-
-Streaming Mode
-
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
-
-
-## Running SMOKE suite on the SparkRunner (local)
-
-Batch Mode
-
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true"
-
-Streaming Mode
-
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true"
-
-
-## Running SMOKE suite on the FlinkRunner (local)
-
-Batch Mode
-
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true --flinkMaster=local"
-
-Streaming Mode
-
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true --flinkMaster=local"
-
-
-## Running SMOKE suite on the ApexRunner (local)
-
-Batch Mode
-
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false"
-
-Streaming Mode
-
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false"
-
-
-## Running SMOKE suite on Google Cloud Dataflow
-
-Building package
-
- mvn clean package -Pdataflow-runner
-
-Submit to Google Dataflow service
-
-
-```
-java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
- org.apache.beam.integration.nexmark.Main \
- --runner=DataflowRunner
- --project=<your project> \
- --zone=<your zone> \
- --workerMachineType=n1-highmem-8 \
- --stagingLocation=<a gs path for staging> \
- --streaming=true \
- --sourceType=PUBSUB \
- --pubSubMode=PUBLISH_ONLY \
- --pubsubTopic=<an existing Pubsub topic> \
- --resourceNameMode=VERBATIM \
- --manageResources=false \
- --monitorJobs=false \
- --numEventGenerators=64 \
- --numWorkers=16 \
- --maxNumWorkers=16 \
- --suite=SMOKE \
- --firstEventRate=100000 \
- --nextEventRate=100000 \
- --ratePeriodSec=3600 \
- --isRateLimited=true \
- --avgPersonByteSize=500 \
- --avgAuctionByteSize=500 \
- --avgBidByteSize=500 \
- --probDelayedEvent=0.000001 \
- --occasionalDelaySec=3600 \
- --numEvents=0 \
- --useWallclockEventTime=true \
- --usePubsubPublishTime=true \
- --experiments=enable_custom_pubsub_sink
-```
-
-```
-java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
- org.apache.beam.integration.nexmark.Main \
- --runner=DataflowRunner
- --project=<your project> \
- --zone=<your zone> \
- --workerMachineType=n1-highmem-8 \
- --stagingLocation=<a gs path for staging> \
- --streaming=true \
- --sourceType=PUBSUB \
- --pubSubMode=SUBSCRIBE_ONLY \
- --pubsubSubscription=<an existing Pubsub subscription to above topic> \
- --resourceNameMode=VERBATIM \
- --manageResources=false \
- --monitorJobs=false \
- --numWorkers=64 \
- --maxNumWorkers=64 \
- --suite=SMOKE \
- --usePubsubPublishTime=true \
- --outputPath=<a gs path under which log files will be written> \
- --windowSizeSec=600 \
- --occasionalDelaySec=3600 \
- --maxLogEvents=10000 \
- --experiments=enable_custom_pubsub_source
-```
-
-## Running query 0 on a Spark cluster with yarn
-
-Building package
-
- mvn clean package -Pspark-runner
-
-Submit to the cluster
-
- spark-submit --master yarn-client --class org.apache.beam.integration.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true
-
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
deleted file mode 100644
index 664a410..0000000
--- a/integration/java/nexmark/pom.xml
+++ /dev/null
@@ -1,292 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-integration-java-parent</artifactId>
- <version>2.1.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <artifactId>beam-integration-java-nexmark</artifactId>
- <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name>
- <packaging>jar</packaging>
-
- <profiles>
-
- <!--
- The direct runner is available by default.
- You can also include it on the classpath explicitly with -P direct-runner
- -->
- <profile>
- <id>direct-runner</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-direct-java</artifactId>
- <scope>runtime</scope>
- </dependency>
- </dependencies>
- </profile>
-
- <!-- Include the Apache Apex runner with -P apex-runner -->
- <profile>
- <id>apex-runner</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-apex</artifactId>
- <scope>runtime</scope>
- </dependency>
- <!--
- Apex depends on httpclient version 4.3.5, project has a transitive dependency to httpclient 4.0.1 from
- google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
- can be removed when the project no longer has a dependency on a different httpclient version.
- -->
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.3.5</version>
- <scope>runtime</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- </profile>
-
- <!-- Include the Apache Flink runner with -P flink-runner -->
- <profile>
- <id>flink-runner</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-flink_2.10</artifactId>
- <scope>runtime</scope>
- </dependency>
- </dependencies>
- </profile>
-
- <!-- Include the Apache Spark runner -P spark-runner -->
- <profile>
- <id>spark-runner</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-spark</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
- <version>${spark.version}</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>${spark.version}</version>
- <scope>runtime</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jul-to-slf4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- </profile>
-
- <!-- Include the Google Cloud Dataflow runner -P dataflow-runner -->
- <profile>
- <id>dataflow-runner</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
- <scope>runtime</scope>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <finalName>${project.artifactId}-bundled-${project.version}</finalName>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- Avro plugin for automatic code generation -->
- <plugin>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>schemas</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>schema</goal>
- </goals>
- <configuration>
- <sourceDirectory>${project.basedir}/src/main/</sourceDirectory>
- <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <!-- Coverage analysis for unit tests. -->
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <!-- Java SDK -->
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- </dependency>
-
- <!-- IOs -->
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-bigquery</artifactId>
- </dependency>
-
- <!-- Extra libraries -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
-
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- <scope>runtime</scope>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-core</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
- </dependency>
-
- <!-- Test -->
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-direct-java</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
deleted file mode 100644
index 4c23651..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Main.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * An implementation of the 'NEXMark queries' for Beam.
- * These are multiple queries over a three table schema representing an online auction system:
- * <ul>
- * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
- * on an auction.
- * <li>{@link Auction} represents an item under auction.
- * <li>{@link Bid} represents a bid for an item under auction.
- * </ul>
- * The queries exercise many aspects of the Beam model.
- *
- * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
- * particularly sensible.
- *
- * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
- * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
- */
-public class Main<OptionT extends NexmarkOptions> {
-
- /**
- * Entry point.
- */
- void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) {
- Instant start = Instant.now();
- Map<NexmarkConfiguration, NexmarkPerf> baseline = loadBaseline(options.getBaselineFilename());
- Map<NexmarkConfiguration, NexmarkPerf> actual = new LinkedHashMap<>();
- Iterable<NexmarkConfiguration> configurations = options.getSuite().getConfigurations(options);
-
- boolean successful = true;
- try {
- // Run all the configurations.
- for (NexmarkConfiguration configuration : configurations) {
- NexmarkPerf perf = nexmarkLauncher.run(configuration);
- if (perf != null) {
- if (perf.errors == null || perf.errors.size() > 0) {
- successful = false;
- }
- appendPerf(options.getPerfFilename(), configuration, perf);
- actual.put(configuration, perf);
- // Summarize what we've run so far.
- saveSummary(null, configurations, actual, baseline, start);
- }
- }
- } finally {
- if (options.getMonitorJobs()) {
- // Report overall performance.
- saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start);
- saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start);
- }
- }
-
- if (!successful) {
- throw new RuntimeException("Execution was not successful");
- }
- }
-
- /**
- * Append the pair of {@code configuration} and {@code perf} to perf file.
- */
- private void appendPerf(
- @Nullable String perfFilename, NexmarkConfiguration configuration,
- NexmarkPerf perf) {
- if (perfFilename == null) {
- return;
- }
- List<String> lines = new ArrayList<>();
- lines.add("");
- lines.add(String.format("# %s", Instant.now()));
- lines.add(String.format("# %s", configuration.toShortString()));
- lines.add(configuration.toString());
- lines.add(perf.toString());
- try {
- Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE,
- StandardOpenOption.APPEND);
- } catch (IOException e) {
- throw new RuntimeException("Unable to write perf file: ", e);
- }
- NexmarkUtils.console("appended results to perf file %s.", perfFilename);
- }
-
- /**
- * Load the baseline perf.
- */
- @Nullable
- private static Map<NexmarkConfiguration, NexmarkPerf> loadBaseline(
- @Nullable String baselineFilename) {
- if (baselineFilename == null) {
- return null;
- }
- Map<NexmarkConfiguration, NexmarkPerf> baseline = new LinkedHashMap<>();
- List<String> lines;
- try {
- lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8);
- } catch (IOException e) {
- throw new RuntimeException("Unable to read baseline perf file: ", e);
- }
- for (int i = 0; i < lines.size(); i++) {
- if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) {
- continue;
- }
- NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++));
- NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i));
- baseline.put(configuration, perf);
- }
- NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(),
- baselineFilename);
- return baseline;
- }
-
- private static final String LINE =
- "==========================================================================================";
-
- /**
- * Print summary of {@code actual} vs (if non-null) {@code baseline}.
- */
- private static void saveSummary(
- @Nullable String summaryFilename,
- Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
- @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
- List<String> lines = new ArrayList<>();
-
- lines.add("");
- lines.add(LINE);
-
- lines.add(
- String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now())));
- lines.add("");
-
- lines.add("Default configuration:");
- lines.add(NexmarkConfiguration.DEFAULT.toString());
- lines.add("");
-
- lines.add("Configurations:");
- lines.add(" Conf Description");
- int conf = 0;
- for (NexmarkConfiguration configuration : configurations) {
- lines.add(String.format(" %04d %s", conf++, configuration.toShortString()));
- NexmarkPerf actualPerf = actual.get(configuration);
- if (actualPerf != null && actualPerf.jobId != null) {
- lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId));
- }
- }
-
- lines.add("");
- lines.add("Performance:");
- lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)",
- "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)"));
- conf = 0;
- for (NexmarkConfiguration configuration : configurations) {
- String line = String.format(" %04d ", conf++);
- NexmarkPerf actualPerf = actual.get(configuration);
- if (actualPerf == null) {
- line += "*** not run ***";
- } else {
- NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
- double runtimeSec = actualPerf.runtimeSec;
- line += String.format("%12.1f ", runtimeSec);
- if (baselinePerf == null) {
- line += String.format("%12s ", "");
- } else {
- double baselineRuntimeSec = baselinePerf.runtimeSec;
- double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0;
- line += String.format("%+11.2f%% ", diff);
- }
-
- double eventsPerSec = actualPerf.eventsPerSec;
- line += String.format("%12.1f ", eventsPerSec);
- if (baselinePerf == null) {
- line += String.format("%12s ", "");
- } else {
- double baselineEventsPerSec = baselinePerf.eventsPerSec;
- double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0;
- line += String.format("%+11.2f%% ", diff);
- }
-
- long numResults = actualPerf.numResults;
- line += String.format("%12d ", numResults);
- if (baselinePerf == null) {
- line += String.format("%12s", "");
- } else {
- long baselineNumResults = baselinePerf.numResults;
- long diff = numResults - baselineNumResults;
- line += String.format("%+12d", diff);
- }
- }
- lines.add(line);
-
- if (actualPerf != null) {
- List<String> errors = actualPerf.errors;
- if (errors == null) {
- errors = new ArrayList<>();
- errors.add("NexmarkGoogleRunner returned null errors list");
- }
- for (String error : errors) {
- lines.add(String.format(" %4s *** %s ***", "", error));
- }
- }
- }
-
- lines.add(LINE);
- lines.add("");
-
- for (String line : lines) {
- System.out.println(line);
- }
-
- if (summaryFilename != null) {
- try {
- Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8,
- StandardOpenOption.CREATE, StandardOpenOption.APPEND);
- } catch (IOException e) {
- throw new RuntimeException("Unable to save summary file: ", e);
- }
- NexmarkUtils.console("appended summary to summary file %s.", summaryFilename);
- }
- }
-
- /**
- * Write all perf data and any baselines to a javascript file which can be used by
- * graphing page etc.
- */
- private static void saveJavascript(
- @Nullable String javascriptFilename,
- Iterable<NexmarkConfiguration> configurations, Map<NexmarkConfiguration, NexmarkPerf> actual,
- @Nullable Map<NexmarkConfiguration, NexmarkPerf> baseline, Instant start) {
- if (javascriptFilename == null) {
- return;
- }
-
- List<String> lines = new ArrayList<>();
- lines.add(String.format(
- "// Run started %s and ran for %s", start, new Duration(start, Instant.now())));
- lines.add("var all = [");
-
- for (NexmarkConfiguration configuration : configurations) {
- lines.add(" {");
- lines.add(String.format(" config: %s", configuration));
- NexmarkPerf actualPerf = actual.get(configuration);
- if (actualPerf != null) {
- lines.add(String.format(" ,perf: %s", actualPerf));
- }
- NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration);
- if (baselinePerf != null) {
- lines.add(String.format(" ,baseline: %s", baselinePerf));
- }
- lines.add(" },");
- }
-
- lines.add("];");
-
- try {
- Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8,
- StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
- } catch (IOException e) {
- throw new RuntimeException("Unable to save javascript file: ", e);
- }
- NexmarkUtils.console("saved javascript to file %s.", javascriptFilename);
- }
-
- public static void main(String[] args) {
- NexmarkOptions options = PipelineOptionsFactory.fromArgs(args)
- .withValidation()
- .as(NexmarkOptions.class);
- NexmarkLauncher<NexmarkOptions> nexmarkLauncher = new NexmarkLauncher<>(options);
- new Main<>().runAll(options, nexmarkLauncher);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
deleted file mode 100644
index 2f0c56a..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Monitor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Distribution;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A monitor of elements with support for later retrieving their metrics.
- *
- * @param <T> Type of element we are monitoring.
- */
-public class Monitor<T extends KnownSize> implements Serializable {
- private class MonitorDoFn extends DoFn<T, T> {
- final Counter elementCounter =
- Metrics.counter(name , prefix + ".elements");
- final Counter bytesCounter =
- Metrics.counter(name , prefix + ".bytes");
- final Distribution startTime =
- Metrics.distribution(name , prefix + ".startTime");
- final Distribution endTime =
- Metrics.distribution(name , prefix + ".endTime");
- final Distribution startTimestamp =
- Metrics.distribution(name , prefix + ".startTimestamp");
- final Distribution endTimestamp =
- Metrics.distribution(name , prefix + ".endTimestamp");
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- elementCounter.inc();
- bytesCounter.inc(c.element().sizeInBytes());
- long now = System.currentTimeMillis();
- startTime.update(now);
- endTime.update(now);
- startTimestamp.update(c.timestamp().getMillis());
- endTimestamp.update(c.timestamp().getMillis());
- c.output(c.element());
- }
- }
-
- public final String name;
- public final String prefix;
- private final MonitorDoFn doFn;
- private final PTransform<PCollection<? extends T>, PCollection<T>> transform;
-
- public Monitor(String name, String prefix) {
- this.name = name;
- this.prefix = prefix;
- doFn = new MonitorDoFn();
- transform = ParDo.of(doFn);
- }
-
- public PTransform<PCollection<? extends T>, PCollection<T>> getTransform() {
- return transform;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
deleted file mode 100644
index 2faf3f5..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ /dev/null
@@ -1,721 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * Configuration controlling how a query is run. May be supplied by command line or
- * programmatically. We only capture properties which may influence the resulting
- * pipeline performance, as captured by {@link NexmarkPerf}.
- */
-public class NexmarkConfiguration implements Serializable {
- public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration();
-
- /** If {@literal true}, include additional debugging and monitoring stats. */
- @JsonProperty
- public boolean debug = true;
-
- /** Which query to run, in [0,9]. */
- @JsonProperty
- public int query = 0;
-
- /** Where events come from. */
- @JsonProperty
- public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT;
-
- /** Where results go to. */
- @JsonProperty
- public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL;
-
- /**
- * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated
- * into the overall query pipeline.
- */
- @JsonProperty
- public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED;
-
- /**
- * Number of events to generate. If zero, generate as many as possible without overflowing
- * internal counters etc.
- */
- @JsonProperty
- public long numEvents = 100000;
-
- /**
- * Number of event generators to use. Each generates events in its own timeline.
- */
- @JsonProperty
- public int numEventGenerators = 100;
-
- /**
- * Shape of event rate curve.
- */
- @JsonProperty
- public NexmarkUtils.RateShape rateShape = NexmarkUtils.RateShape.SINE;
-
- /**
- * Initial overall event rate (in {@link #rateUnit}).
- */
- @JsonProperty
- public int firstEventRate = 10000;
-
- /**
- * Next overall event rate (in {@link #rateUnit}).
- */
- @JsonProperty
- public int nextEventRate = 10000;
-
- /**
- * Unit for rates.
- */
- @JsonProperty
- public NexmarkUtils.RateUnit rateUnit = NexmarkUtils.RateUnit.PER_SECOND;
-
- /**
- * Overall period of rate shape, in seconds.
- */
- @JsonProperty
- public int ratePeriodSec = 600;
-
- /**
- * Time in seconds to preload the subscription with data, at the initial input rate of the
- * pipeline.
- */
- @JsonProperty
- public int preloadSeconds = 0;
-
- /**
- * Timeout for stream pipelines to stop in seconds.
- */
- @JsonProperty
- public int streamTimeout = 240;
-
- /**
- * If true, and in streaming mode, generate events only when they are due according to their
- * timestamp.
- */
- @JsonProperty
- public boolean isRateLimited = false;
-
- /**
- * If true, use wallclock time as event time. Otherwise, use a deterministic
- * time in the past so that multiple runs will see exactly the same event streams
- * and should thus have exactly the same results.
- */
- @JsonProperty
- public boolean useWallclockEventTime = false;
-
- /** Average idealized size of a 'new person' event, in bytes. */
- @JsonProperty
- public int avgPersonByteSize = 200;
-
- /** Average idealized size of a 'new auction' event, in bytes. */
- @JsonProperty
- public int avgAuctionByteSize = 500;
-
- /** Average idealized size of a 'bid' event, in bytes. */
- @JsonProperty
- public int avgBidByteSize = 100;
-
- /** Ratio of bids to 'hot' auctions compared to all other auctions. */
- @JsonProperty
- public int hotAuctionRatio = 2;
-
- /** Ratio of auctions for 'hot' sellers compared to all other people. */
- @JsonProperty
- public int hotSellersRatio = 4;
-
- /** Ratio of bids for 'hot' bidders compared to all other people. */
- @JsonProperty
- public int hotBiddersRatio = 4;
-
- /** Window size, in seconds, for queries 3, 5, 7 and 8. */
- @JsonProperty
- public long windowSizeSec = 10;
-
- /** Sliding window period, in seconds, for query 5. */
- @JsonProperty
- public long windowPeriodSec = 5;
-
- /** Number of seconds to hold back events according to their reported timestamp. */
- @JsonProperty
- public long watermarkHoldbackSec = 0;
-
- /** Average number of auction which should be inflight at any time, per generator. */
- @JsonProperty
- public int numInFlightAuctions = 100;
-
- /** Maximum number of people to consider as active for placing auctions or bids. */
- @JsonProperty
- public int numActivePeople = 1000;
-
- /** Coder strategy to follow. */
- @JsonProperty
- public NexmarkUtils.CoderStrategy coderStrategy = NexmarkUtils.CoderStrategy.HAND;
-
- /**
- * Delay, in milliseconds, for each event. This will peg one core for this number
- * of milliseconds to simulate CPU-bound computation.
- */
- @JsonProperty
- public long cpuDelayMs = 0;
-
- /**
- * Extra data, in bytes, to save to persistent state for each event. This will force
- * i/o all the way to durable storage to simulate an I/O-bound computation.
- */
- @JsonProperty
- public long diskBusyBytes = 0;
-
- /**
- * Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction.
- */
- @JsonProperty
- public int auctionSkip = 123;
-
- /**
- * Fanout for queries 4 (groups by category id), 5 and 7 (find a global maximum).
- */
- @JsonProperty
- public int fanout = 5;
-
- /**
- * Maximum waiting time to clean personState in query3
- * (ie maximum waiting of the auctions related to person in state in seconds in event time).
- */
- @JsonProperty
- public int maxAuctionsWaitingTime = 600;
-
- /**
- * Length of occasional delay to impose on events (in seconds).
- */
- @JsonProperty
- public long occasionalDelaySec = 3;
-
- /**
- * Probability that an event will be delayed by delayS.
- */
- @JsonProperty
- public double probDelayedEvent = 0.1;
-
- /**
- * Maximum size of each log file (in events). For Query10 only.
- */
- @JsonProperty
- public int maxLogEvents = 100_000;
-
- /**
- * If true, use pub/sub publish time instead of event time.
- */
- @JsonProperty
- public boolean usePubsubPublishTime = false;
-
- /**
- * Number of events in out-of-order groups. 1 implies no out-of-order events. 1000 implies
- * every 1000 events per generator are emitted in pseudo-random order.
- */
- @JsonProperty
- public long outOfOrderGroupSize = 1;
-
- /**
- * Replace any properties of this configuration which have been supplied by the command line.
- */
- public void overrideFromOptions(NexmarkOptions options) {
- if (options.getDebug() != null) {
- debug = options.getDebug();
- }
- if (options.getQuery() != null) {
- query = options.getQuery();
- }
- if (options.getSourceType() != null) {
- sourceType = options.getSourceType();
- }
- if (options.getSinkType() != null) {
- sinkType = options.getSinkType();
- }
- if (options.getPubSubMode() != null) {
- pubSubMode = options.getPubSubMode();
- }
- if (options.getNumEvents() != null) {
- numEvents = options.getNumEvents();
- }
- if (options.getNumEventGenerators() != null) {
- numEventGenerators = options.getNumEventGenerators();
- }
- if (options.getRateShape() != null) {
- rateShape = options.getRateShape();
- }
- if (options.getFirstEventRate() != null) {
- firstEventRate = options.getFirstEventRate();
- }
- if (options.getNextEventRate() != null) {
- nextEventRate = options.getNextEventRate();
- }
- if (options.getRateUnit() != null) {
- rateUnit = options.getRateUnit();
- }
- if (options.getRatePeriodSec() != null) {
- ratePeriodSec = options.getRatePeriodSec();
- }
- if (options.getPreloadSeconds() != null) {
- preloadSeconds = options.getPreloadSeconds();
- }
- if (options.getStreamTimeout() != null) {
- streamTimeout = options.getStreamTimeout();
- }
- if (options.getIsRateLimited() != null) {
- isRateLimited = options.getIsRateLimited();
- }
- if (options.getUseWallclockEventTime() != null) {
- useWallclockEventTime = options.getUseWallclockEventTime();
- }
- if (options.getAvgPersonByteSize() != null) {
- avgPersonByteSize = options.getAvgPersonByteSize();
- }
- if (options.getAvgAuctionByteSize() != null) {
- avgAuctionByteSize = options.getAvgAuctionByteSize();
- }
- if (options.getAvgBidByteSize() != null) {
- avgBidByteSize = options.getAvgBidByteSize();
- }
- if (options.getHotAuctionRatio() != null) {
- hotAuctionRatio = options.getHotAuctionRatio();
- }
- if (options.getHotSellersRatio() != null) {
- hotSellersRatio = options.getHotSellersRatio();
- }
- if (options.getHotBiddersRatio() != null) {
- hotBiddersRatio = options.getHotBiddersRatio();
- }
- if (options.getWindowSizeSec() != null) {
- windowSizeSec = options.getWindowSizeSec();
- }
- if (options.getWindowPeriodSec() != null) {
- windowPeriodSec = options.getWindowPeriodSec();
- }
- if (options.getWatermarkHoldbackSec() != null) {
- watermarkHoldbackSec = options.getWatermarkHoldbackSec();
- }
- if (options.getNumInFlightAuctions() != null) {
- numInFlightAuctions = options.getNumInFlightAuctions();
- }
- if (options.getNumActivePeople() != null) {
- numActivePeople = options.getNumActivePeople();
- }
- if (options.getCoderStrategy() != null) {
- coderStrategy = options.getCoderStrategy();
- }
- if (options.getCpuDelayMs() != null) {
- cpuDelayMs = options.getCpuDelayMs();
- }
- if (options.getDiskBusyBytes() != null) {
- diskBusyBytes = options.getDiskBusyBytes();
- }
- if (options.getAuctionSkip() != null) {
- auctionSkip = options.getAuctionSkip();
- }
- if (options.getFanout() != null) {
- fanout = options.getFanout();
- }
- if (options.getMaxAuctionsWaitingTime() != null) {
- fanout = options.getMaxAuctionsWaitingTime();
- }
- if (options.getOccasionalDelaySec() != null) {
- occasionalDelaySec = options.getOccasionalDelaySec();
- }
- if (options.getProbDelayedEvent() != null) {
- probDelayedEvent = options.getProbDelayedEvent();
- }
- if (options.getMaxLogEvents() != null) {
- maxLogEvents = options.getMaxLogEvents();
- }
- if (options.getUsePubsubPublishTime() != null) {
- usePubsubPublishTime = options.getUsePubsubPublishTime();
- }
- if (options.getOutOfOrderGroupSize() != null) {
- outOfOrderGroupSize = options.getOutOfOrderGroupSize();
- }
- }
-
- /**
- * Return copy of configuration with given label.
- */
- public NexmarkConfiguration copy() {
- NexmarkConfiguration result;
- result = new NexmarkConfiguration();
- result.debug = debug;
- result.query = query;
- result.sourceType = sourceType;
- result.sinkType = sinkType;
- result.pubSubMode = pubSubMode;
- result.numEvents = numEvents;
- result.numEventGenerators = numEventGenerators;
- result.rateShape = rateShape;
- result.firstEventRate = firstEventRate;
- result.nextEventRate = nextEventRate;
- result.rateUnit = rateUnit;
- result.ratePeriodSec = ratePeriodSec;
- result.preloadSeconds = preloadSeconds;
- result.streamTimeout = streamTimeout;
- result.isRateLimited = isRateLimited;
- result.useWallclockEventTime = useWallclockEventTime;
- result.avgPersonByteSize = avgPersonByteSize;
- result.avgAuctionByteSize = avgAuctionByteSize;
- result.avgBidByteSize = avgBidByteSize;
- result.hotAuctionRatio = hotAuctionRatio;
- result.hotSellersRatio = hotSellersRatio;
- result.hotBiddersRatio = hotBiddersRatio;
- result.windowSizeSec = windowSizeSec;
- result.windowPeriodSec = windowPeriodSec;
- result.watermarkHoldbackSec = watermarkHoldbackSec;
- result.numInFlightAuctions = numInFlightAuctions;
- result.numActivePeople = numActivePeople;
- result.coderStrategy = coderStrategy;
- result.cpuDelayMs = cpuDelayMs;
- result.diskBusyBytes = diskBusyBytes;
- result.auctionSkip = auctionSkip;
- result.fanout = fanout;
- result.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
- result.occasionalDelaySec = occasionalDelaySec;
- result.probDelayedEvent = probDelayedEvent;
- result.maxLogEvents = maxLogEvents;
- result.usePubsubPublishTime = usePubsubPublishTime;
- result.outOfOrderGroupSize = outOfOrderGroupSize;
- return result;
- }
-
- /**
- * Return short description of configuration (suitable for use in logging). We only render
- * the core fields plus those which do not have default values.
- */
- public String toShortString() {
- StringBuilder sb = new StringBuilder();
- sb.append(String.format("query:%d", query));
- if (debug != DEFAULT.debug) {
- sb.append(String.format("; debug:%s", debug));
- }
- if (sourceType != DEFAULT.sourceType) {
- sb.append(String.format("; sourceType:%s", sourceType));
- }
- if (sinkType != DEFAULT.sinkType) {
- sb.append(String.format("; sinkType:%s", sinkType));
- }
- if (pubSubMode != DEFAULT.pubSubMode) {
- sb.append(String.format("; pubSubMode:%s", pubSubMode));
- }
- if (numEvents != DEFAULT.numEvents) {
- sb.append(String.format("; numEvents:%d", numEvents));
- }
- if (numEventGenerators != DEFAULT.numEventGenerators) {
- sb.append(String.format("; numEventGenerators:%d", numEventGenerators));
- }
- if (rateShape != DEFAULT.rateShape) {
- sb.append(String.format("; rateShape:%s", rateShape));
- }
- if (firstEventRate != DEFAULT.firstEventRate || nextEventRate != DEFAULT.nextEventRate) {
- sb.append(String.format("; firstEventRate:%d", firstEventRate));
- sb.append(String.format("; nextEventRate:%d", nextEventRate));
- }
- if (rateUnit != DEFAULT.rateUnit) {
- sb.append(String.format("; rateUnit:%s", rateUnit));
- }
- if (ratePeriodSec != DEFAULT.ratePeriodSec) {
- sb.append(String.format("; ratePeriodSec:%d", ratePeriodSec));
- }
- if (preloadSeconds != DEFAULT.preloadSeconds) {
- sb.append(String.format("; preloadSeconds:%d", preloadSeconds));
- }
- if (streamTimeout != DEFAULT.streamTimeout) {
- sb.append(String.format("; streamTimeout:%d", streamTimeout));
- }
- if (isRateLimited != DEFAULT.isRateLimited) {
- sb.append(String.format("; isRateLimited:%s", isRateLimited));
- }
- if (useWallclockEventTime != DEFAULT.useWallclockEventTime) {
- sb.append(String.format("; useWallclockEventTime:%s", useWallclockEventTime));
- }
- if (avgPersonByteSize != DEFAULT.avgPersonByteSize) {
- sb.append(String.format("; avgPersonByteSize:%d", avgPersonByteSize));
- }
- if (avgAuctionByteSize != DEFAULT.avgAuctionByteSize) {
- sb.append(String.format("; avgAuctionByteSize:%d", avgAuctionByteSize));
- }
- if (avgBidByteSize != DEFAULT.avgBidByteSize) {
- sb.append(String.format("; avgBidByteSize:%d", avgBidByteSize));
- }
- if (hotAuctionRatio != DEFAULT.hotAuctionRatio) {
- sb.append(String.format("; hotAuctionRatio:%d", hotAuctionRatio));
- }
- if (hotSellersRatio != DEFAULT.hotSellersRatio) {
- sb.append(String.format("; hotSellersRatio:%d", hotSellersRatio));
- }
- if (hotBiddersRatio != DEFAULT.hotBiddersRatio) {
- sb.append(String.format("; hotBiddersRatio:%d", hotBiddersRatio));
- }
- if (windowSizeSec != DEFAULT.windowSizeSec) {
- sb.append(String.format("; windowSizeSec:%d", windowSizeSec));
- }
- if (windowPeriodSec != DEFAULT.windowPeriodSec) {
- sb.append(String.format("; windowPeriodSec:%d", windowPeriodSec));
- }
- if (watermarkHoldbackSec != DEFAULT.watermarkHoldbackSec) {
- sb.append(String.format("; watermarkHoldbackSec:%d", watermarkHoldbackSec));
- }
- if (numInFlightAuctions != DEFAULT.numInFlightAuctions) {
- sb.append(String.format("; numInFlightAuctions:%d", numInFlightAuctions));
- }
- if (numActivePeople != DEFAULT.numActivePeople) {
- sb.append(String.format("; numActivePeople:%d", numActivePeople));
- }
- if (coderStrategy != DEFAULT.coderStrategy) {
- sb.append(String.format("; coderStrategy:%s", coderStrategy));
- }
- if (cpuDelayMs != DEFAULT.cpuDelayMs) {
- sb.append(String.format("; cpuSlowdownMs:%d", cpuDelayMs));
- }
- if (diskBusyBytes != DEFAULT.diskBusyBytes) {
- sb.append(String.format("; diskBuysBytes:%d", diskBusyBytes));
- }
- if (auctionSkip != DEFAULT.auctionSkip) {
- sb.append(String.format("; auctionSkip:%d", auctionSkip));
- }
- if (fanout != DEFAULT.fanout) {
- sb.append(String.format("; fanout:%d", fanout));
- }
- if (maxAuctionsWaitingTime != DEFAULT.maxAuctionsWaitingTime) {
- sb.append(String.format("; maxAuctionsWaitingTime:%d", fanout));
- }
- if (occasionalDelaySec != DEFAULT.occasionalDelaySec) {
- sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec));
- }
- if (probDelayedEvent != DEFAULT.probDelayedEvent) {
- sb.append(String.format("; probDelayedEvent:%f", probDelayedEvent));
- }
- if (maxLogEvents != DEFAULT.maxLogEvents) {
- sb.append(String.format("; maxLogEvents:%d", maxLogEvents));
- }
- if (usePubsubPublishTime != DEFAULT.usePubsubPublishTime) {
- sb.append(String.format("; usePubsubPublishTime:%s", usePubsubPublishTime));
- }
- if (outOfOrderGroupSize != DEFAULT.outOfOrderGroupSize) {
- sb.append(String.format("; outOfOrderGroupSize:%d", outOfOrderGroupSize));
- }
- return sb.toString();
- }
-
- /**
- * Return full description as a string.
- */
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Parse an object from {@code string}.
- */
- public static NexmarkConfiguration fromString(String string) {
- try {
- return NexmarkUtils.MAPPER.readValue(string, NexmarkConfiguration.class);
- } catch (IOException e) {
- throw new RuntimeException("Unable to parse nexmark configuration: ", e);
- }
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- debug,
- query,
- sourceType,
- sinkType,
- pubSubMode,
- numEvents,
- numEventGenerators,
- rateShape,
- firstEventRate,
- nextEventRate,
- rateUnit,
- ratePeriodSec,
- preloadSeconds,
- streamTimeout,
- isRateLimited,
- useWallclockEventTime,
- avgPersonByteSize,
- avgAuctionByteSize,
- avgBidByteSize,
- hotAuctionRatio,
- hotSellersRatio,
- hotBiddersRatio,
- windowSizeSec,
- windowPeriodSec,
- watermarkHoldbackSec,
- numInFlightAuctions,
- numActivePeople,
- coderStrategy,
- cpuDelayMs,
- diskBusyBytes,
- auctionSkip,
- fanout,
- maxAuctionsWaitingTime,
- occasionalDelaySec,
- probDelayedEvent,
- maxLogEvents,
- usePubsubPublishTime,
- outOfOrderGroupSize);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- NexmarkConfiguration other = (NexmarkConfiguration) obj;
- if (debug != other.debug) {
- return false;
- }
- if (auctionSkip != other.auctionSkip) {
- return false;
- }
- if (avgAuctionByteSize != other.avgAuctionByteSize) {
- return false;
- }
- if (avgBidByteSize != other.avgBidByteSize) {
- return false;
- }
- if (avgPersonByteSize != other.avgPersonByteSize) {
- return false;
- }
- if (coderStrategy != other.coderStrategy) {
- return false;
- }
- if (cpuDelayMs != other.cpuDelayMs) {
- return false;
- }
- if (diskBusyBytes != other.diskBusyBytes) {
- return false;
- }
- if (fanout != other.fanout) {
- return false;
- }
- if (maxAuctionsWaitingTime != other.maxAuctionsWaitingTime) {
- return false;
- }
- if (firstEventRate != other.firstEventRate) {
- return false;
- }
- if (hotAuctionRatio != other.hotAuctionRatio) {
- return false;
- }
- if (hotBiddersRatio != other.hotBiddersRatio) {
- return false;
- }
- if (hotSellersRatio != other.hotSellersRatio) {
- return false;
- }
- if (isRateLimited != other.isRateLimited) {
- return false;
- }
- if (maxLogEvents != other.maxLogEvents) {
- return false;
- }
- if (nextEventRate != other.nextEventRate) {
- return false;
- }
- if (rateUnit != other.rateUnit) {
- return false;
- }
- if (numEventGenerators != other.numEventGenerators) {
- return false;
- }
- if (numEvents != other.numEvents) {
- return false;
- }
- if (numInFlightAuctions != other.numInFlightAuctions) {
- return false;
- }
- if (numActivePeople != other.numActivePeople) {
- return false;
- }
- if (occasionalDelaySec != other.occasionalDelaySec) {
- return false;
- }
- if (preloadSeconds != other.preloadSeconds) {
- return false;
- }
- if (streamTimeout != other.streamTimeout) {
- return false;
- }
- if (Double.doubleToLongBits(probDelayedEvent)
- != Double.doubleToLongBits(other.probDelayedEvent)) {
- return false;
- }
- if (pubSubMode != other.pubSubMode) {
- return false;
- }
- if (ratePeriodSec != other.ratePeriodSec) {
- return false;
- }
- if (rateShape != other.rateShape) {
- return false;
- }
- if (query != other.query) {
- return false;
- }
- if (sinkType != other.sinkType) {
- return false;
- }
- if (sourceType != other.sourceType) {
- return false;
- }
- if (useWallclockEventTime != other.useWallclockEventTime) {
- return false;
- }
- if (watermarkHoldbackSec != other.watermarkHoldbackSec) {
- return false;
- }
- if (windowPeriodSec != other.windowPeriodSec) {
- return false;
- }
- if (windowSizeSec != other.windowSizeSec) {
- return false;
- }
- if (usePubsubPublishTime != other.usePubsubPublishTime) {
- return false;
- }
- if (outOfOrderGroupSize != other.outOfOrderGroupSize) {
- return false;
- }
- return true;
- }
-}