You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:47 UTC
[39/55] [abbrv] beam git commit: Change benchmark workload settings
Change benchmark workload settings
Update configuration of events generation to add some variation
Update execution matrix (issue #45)
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dbd1b155
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dbd1b155
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dbd1b155
Branch: refs/heads/master
Commit: dbd1b155c32c19ce7a6d0c0f0dffb318c9ccdde7
Parents: 683680b
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 9 11:48:00 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200
----------------------------------------------------------------------
integration/java/nexmark/README.md | 207 +++++++++++--------
.../nexmark/NexmarkConfiguration.java | 10 +-
2 files changed, 128 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/dbd1b155/integration/java/nexmark/README.md
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md
index a3549f4..a9acd63 100644
--- a/integration/java/nexmark/README.md
+++ b/integration/java/nexmark/README.md
@@ -30,14 +30,14 @@ These are multiple queries over a three entities model representing on online au
- **Auction** represents an item under auction.
- **Bid** represents a bid for an item under auction.
-The queries exercise many aspects of dataflow model on Beam:
+The queries exercise many aspects of Beam model:
* **Query1**: What are the bid values in Euro's?
Illustrates a simple map.
* **Query2**: What are the auctions with particular auction numbers?
Illustrates a simple filter.
* **Query3**: Who is selling in particular US states?
- Illustrates an incremental join (using per-key state) and filter.
+ Illustrates an incremental join (using per-key state and timer) and filter.
* **Query4**: What is the average selling price for each auction
category?
Illustrates complex join (using custom window functions) and
@@ -71,19 +71,17 @@ We have augmented the original queries with five more:
compared with event time in non-Global windows for all the other
queries.
-The queries can be executed using a 'Driver' for a given backend.
-Currently the supported drivers are:
+We can specify the Beam runner to use with maven profiles, available profiles are:
-* **NexmarkApexDriver** for running via the Apex runner.
-* **NexmarkDirectDriver** for running locally on a single machine.
-* **NexmarkGoogleDriver** for running on the Google Cloud Dataflow service.
- Requires a Google Cloud account.
-* **NexmarkFlinkDriver** for running on a Flink cluster. Requires the
- cluster to be established and the Nexmark jar to be distributed to
- each worker.
-* **NexmarkSparkDriver** for running on a Spark cluster.
+* direct-runner
+* spark-runner
+* flink-runner
+* apex-runner
+
+The runner must also be specified like in any other Beam pipeline using
+
+ --runner
-Other drivers are straightforward.
Test data is deterministically synthesized on demand. The test
data may be synthesized in the same pipeline as the query itself,
@@ -97,11 +95,6 @@ The query results may be:
* Send to BigQuery.
* Discarded.
-Options are provided for measuring progress, measuring overall
-pipeline performance, and comparing that performance against a known
-baseline. However that machinery has only been implemented against
-the Google Cloud Dataflow driver.
-
# Configuration
## Common configuration parameters
@@ -119,45 +112,48 @@ Run query N
--query=N
## Available Suites
+The suite to run can be chosen using this configuration parameter:
-- DEFAULT: Test default configuration with query 0.
-- SMOKE: Run the 12 default configurations.
-- STRESS: Like smoke but for 1m events.
-- FULL_THROTTLE: Like SMOKE but 100m events.
+ --suite=SUITE
- --suite=SMOKE
+Available suites are:
+* DEFAULT: Test default configuration with query 0.
+* SMOKE: Run the 12 default configurations.
+* STRESS: Like smoke but for 1m events.
+* FULL_THROTTLE: Like SMOKE but 100m events.
-### Apex specific configuration
+
- --suite=SMOKE --manageResources=false --monitorJobs=true
+## Apex specific configuration
-### Dataflow specific configuration
+ --manageResources=false --monitorJobs=false
- --query=0 --suite=SMOKE --manageResources=false --monitorJobs=true \
+## Dataflow specific configuration
+
+ --manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
- --stagingLocation=<a gs path for staging>
-
- --runner=BlockingDataflowRunner \
+ --stagingLocation=<a gs path for staging> \
+ --runner=DataflowRunner \
--tempLocation=gs://talend-imejia/nexmark/temp/ \
--stagingLocation=gs://talend-imejia/nexmark/temp/staging/ \
--filesToStage=target/beam-integration-java-0.7.0-SNAPSHOT.jar
-### Direct specific configuration
+## Direct specific configuration
- --suite=SMOKE --manageResources=false --monitorJobs=true \
+ --manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
-### Flink specific configuration
+## Flink specific configuration
- --suite=SMOKE --manageResources=false --monitorJobs=true \
- --flinkMaster=[local] --parallelism=#numcores
+ --manageResources=false --monitorJobs=true \
+ --flinkMaster=local --parallelism=#numcores
-### Spark specific configuration
+## Spark specific configuration
- --suite=SMOKE --manageResources=false --monitorJobs=true \
+ --manageResources=false --monitorJobs=true \
--sparkMaster=local \
-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
@@ -167,39 +163,39 @@ Open issues are tracked [here](https://github.com../../../../../issues):
## Batch / Synthetic / Local
-| Query | Direct | Spark | Flink | Apex |
-| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------- |
-| 0 | ok | ok | ok | ok |
-| 1 | ok | ok | ok | ok |
-| 2 | ok | ok | ok | ok |
-| 3 | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) |
-| 4 | ok | ok | [#2](../../../../../issues/2) | ok |
-| 5 | ok | ok | ok | ok |
-| 6 | ok | ok | [#2](../../../../../issues/2) | ok |
-| 7 | ok | ok | ok | [#24](../../../../../issues/24) |
-| 8 | ok | ok | ok | ok |
-| 9 | ok | ok | [#2](../../../../../issues/2) | ok |
-| 10 | [#5](../../../../../issues/5) | ok | ok | ok |
-| 11 | ok | ok | ok | ok |
-| 12 | ok | ok | ok | ok |
+| Query | Direct | Spark | Flink | Apex |
+| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
+| 0 | ok | ok | ok | ok |
+| 1 | ok | ok | ok | ok |
+| 2 | ok | ok | ok | ok |
+| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | ok | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
+| 4 | ok | ok | ok | ok |
+| 5 | ok | ok | ok | ok |
+| 6 | ok | ok | ok | ok |
+| 7 | ok | ok | ok | ok |
+| 8 | ok | ok | ok | ok |
+| 9 | ok | ok | ok | ok |
+| 10 | ok | ok | ok | ok |
+| 11 | ok | ok | ok | ok |
+| 12 | ok | ok | ok | ok |
## Streaming / Synthetic / Local
-| Query | Direct | Spark | Flink | Apex |
-| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
-| 0 | ok | | | ok |
-| 1 | ok | | | ok |
-| 2 | ok | | | ok |
-| 3 | [#7](../../../../../issues/7) | | | [#7](../../../../../issues/7) |
-| 4 | ok | | | ok |
-| 5 | ok | | | ok |
-| 6 | ok | | | ok |
-| 7 | ok | | | ? |
-| 8 | ok | | | ok |
-| 9 | ok | | | ok |
-| 10 | [#5](../../../../../issues/5) | | | ? |
-| 11 | ok | | | Ok |
-| 12 | ok | | | Ok |
+| Query | Direct | Spark | Flink | Apex |
+| ----: | ------ | ------------------------------------------------------------ | ---------------------------------------------------------- | ------------------------------------------------------------ |
+| 0 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 1 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 2 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 3 | ok | [BEAM-1035](https://issues.apache.org/jira/browse/BEAM-1035) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | [BEAM-1037](https://issues.apache.org/jira/browse/BEAM-1037) |
+| 4 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 5 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 6 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 7 | ok | [BEAM-2112](https://issues.apache.org/jira/browse/BEAM-2112) | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 8 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 9 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 10 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 11 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
+| 12 | ok | ok | [BEAM-593](https://issues.apache.org/jira/browse/BEAM-593) | ok |
## Batch / Synthetic / Cluster
@@ -219,26 +215,63 @@ TODO
# Running Nexmark
-## Running on the DirectRunner (local)
+## Running SMOKE suite on the DirectRunner (local)
Batch Mode
--Dexec.classpathScope="test"
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=false --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
+
+Streaming Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pdirect-runner -Dexec.args="--runner=DirectRunner --suite=SMOKE --streaming=true --manageResources=false --monitorJobs=true --enforceEncodability=true --enforceImmutability=true"
+
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=false --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
+## Running SMOKE suite on the SparkRunner (local)
+
+Batch Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true"
Streaming Mode
- mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.drivers.NexmarkDirectDriver -Dexec.args="--suite=SMOKE --streaming=true --numEventGenerators=4 --manageResources=false --monitorJobs=false --enforceEncodability=false --enforceImmutability=false"
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pspark-runner "-Dexec.args=--runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true"
+
+
+## Running SMOKE suite on the FlinkRunner (local)
-## Running on Google Cloud Dataflow
+Batch Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true --flinkMaster=local"
+
+Streaming Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Pflink-runner "-Dexec.args=--runner=FlinkRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=true --flinkMaster=local"
+
+
+## Running SMOKE suite on the ApexRunner (local)
+
+Batch Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=false"
+
+Streaming Mode
+
+ mvn exec:java -Dexec.mainClass=org.apache.beam.integration.nexmark.Main -Papex-runner "-Dexec.args=--runner=ApexRunner --suite=SMOKE --streamTimeout=60 --streaming=true --manageResources=false --monitorJobs=false"
+
+
+## Running SMOKE suite on Google Cloud Dataflow
+
+Building package
+
+ mvn clean package -Pdataflow-runner
+
+Submit to Google Dataflow service
-An example invocation for **Query10** on the Google Cloud Dataflow
-service.
```
-java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
- org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \
+java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
+ org.apache.beam.integration.nexmark.Main \
+ --runner=DataflowRunner
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
@@ -253,7 +286,7 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S
--numEventGenerators=64 \
--numWorkers=16 \
--maxNumWorkers=16 \
- --query=10 \
+ --suite=SMOKE \
--firstEventRate=100000 \
--nextEventRate=100000 \
--ratePeriodSec=3600 \
@@ -270,8 +303,9 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S
```
```
-java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
- org.apache.beam.integration.nexmark.drivers.NexmarkGoogleDriver \
+java -cp integration/java/target/beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar \
+ org.apache.beam.integration.nexmark.Main \
+ --runner=DataflowRunner
--project=<your project> \
--zone=<your zone> \
--workerMachineType=n1-highmem-8 \
@@ -285,7 +319,7 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S
--monitorJobs=false \
--numWorkers=64 \
--maxNumWorkers=64 \
- --query=10 \
+ --suite=SMOKE \
--usePubsubPublishTime=true \
--outputPath=<a gs path under which log files will be written> \
--windowSizeSec=600 \
@@ -294,8 +328,13 @@ java -cp integration/java/target/java-integration-all-bundled-0.2.0-incubating-S
--experiments=enable_custom_pubsub_source
```
-## Running on Flink
+## Running query 0 on a Spark cluster with yarn
+
+Building package
+
+ mvn clean package -Pspark-runner
+
+Submit to the cluster
+
+ spark-submit --master yarn-client --class org.apache.beam.integration.nexmark.Main --driver-memory 512m --executor-memory 512m --executor-cores 1 beam-integration-java-nexmark-bundled-2.1.0-SNAPSHOT.jar --runner=SparkRunner --query=0 --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true
-See [BEAM_ON_FLINK_ON_GCP](./BEAM_ON_FLINK_ON_GCP.md) for instructions
-on running a NexMark pipeline using Flink hosted on a Google Compute
-Platform cluster.
http://git-wip-us.apache.org/repos/asf/beam/blob/dbd1b155/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
index 1da08b4..5a8cb71 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java
@@ -140,15 +140,15 @@ public class NexmarkConfiguration implements Serializable {
/** Ratio of bids to 'hot' auctions compared to all other auctions. */
@JsonProperty
- public int hotAuctionRatio = 1;
+ public int hotAuctionRatio = 2;
/** Ratio of auctions for 'hot' sellers compared to all other people. */
@JsonProperty
- public int hotSellersRatio = 1;
+ public int hotSellersRatio = 4;
/** Ratio of bids for 'hot' bidders compared to all other people. */
@JsonProperty
- public int hotBiddersRatio = 1;
+ public int hotBiddersRatio = 4;
/** Window size, in seconds, for queries 3, 5, 7 and 8. */
@JsonProperty
@@ -211,13 +211,13 @@ public class NexmarkConfiguration implements Serializable {
* Length of occasional delay to impose on events (in seconds).
*/
@JsonProperty
- public long occasionalDelaySec = 0;
+ public long occasionalDelaySec = 3;
/**
* Probability that an event will be delayed by delayS.
*/
@JsonProperty
- public double probDelayedEvent = 0.0;
+ public double probDelayedEvent = 0.1;
/**
* Maximum size of each log file (in events). For Query10 only.