You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:25 UTC
[17/55] [abbrv] beam git commit: Fix Apex driver and update execution
matrix
Fix Apex driver and update execution matrix
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ce9bf07
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ce9bf07
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ce9bf07
Branch: refs/heads/master
Commit: 9ce9bf076032e1c9aeb3a6dce806ad4b96127157
Parents: 1bd5735
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue Mar 21 18:29:20 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200
----------------------------------------------------------------------
integration/java/nexmark/README.md | 109 +++++++++++--------
integration/java/nexmark/pom.xml | 27 ++++-
.../integration/nexmark/NexmarkApexRunner.java | 5 -
.../nexmark/NexmarkDirectRunner.java | 5 -
.../integration/nexmark/NexmarkFlinkRunner.java | 5 -
.../nexmark/NexmarkGoogleRunner.java | 5 -
.../beam/integration/nexmark/NexmarkRunner.java | 9 --
.../integration/nexmark/NexmarkSparkRunner.java | 5 -
.../apache/beam/integration/nexmark/Query5.java | 1 -
.../nexmark/src/main/resources/log4j.properties | 9 ++
.../nexmark/UnboundedEventSourceTest.java | 4 +-
11 files changed, 100 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/README.md
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md
index 4c08c28..7a91ab2 100644
--- a/integration/java/nexmark/README.md
+++ b/integration/java/nexmark/README.md
@@ -122,63 +122,80 @@ Number of events generators
--numEventGenerators=4
-## Flink specific configuration
+## Apex specific configuration
---suite=SMOKE --manageResources=false --monitorJobs=false \
---flinkMaster=local
+--suite=SMOKE --manageResources=false --monitorJobs=true
## Direct specific configuration
---suite=SMOKE --manageResources=false --monitorJobs=false \
+--suite=SMOKE --manageResources=false --monitorJobs=true \
--enforceEncodability=false --enforceImmutability=false
+## Flink specific configuration
+
+--suite=SMOKE --manageResources=false --monitorJobs=true \
+--flinkMaster=local
+
## Spark specific configuration
---suite=SMOKE
---manageResources=false --monitorJobs=false --sparkMaster=local
--Dspark.ui.enabled=false
--DSPARK_LOCAL_IP=localhost
--Dsun.io.serialization.extendedDebugInfo=true
+--suite=SMOKE --manageResources=false --monitorJobs=true --sparkMaster=local \
+-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
# Current Status
-Open issues are currently opened on [github](https://github.com/iemejia/beam/issues):
-
-## Batch Mode / Synthetic / Local
-
-| Query | Direct | Spark | Flink | Apex |
-| ----: | ------ | ------ | ------ | ------ |
-| 0 | Ok | #1 | Ok | |
-| 1 | Ok | #1 | Ok | |
-| 2 | Ok | NEX-01 | Ok | |
-| 3 | NEX-07 | NEX-07 | NEX-07 | |
-| 4 | Ok | Ok | NEX-02 | |
-| 5 | Ok | NEX-03 | Ok | |
-| 6 | Ok | OK | NEX-02 | |
-| 7 | Ok | NEX-01 | Ok | |
-| 8 | Ok | NEX-01 | Ok | |
-| 9 | Ok | OK | NEX-02 | |
-| 10 | NEX-05 | NEX-04 | Ok | |
-| 11 | Ok | NEX-01 | Ok | |
-| 12 | Ok | NEX-01 | Ok | |
-
-## Streaming Mode / Synthetic / Local
-
-| Query | Direct | Spark | Flink | Apex |
-| ----: | ------ | ------ | ------ | ------ |
-| 0 | Ok | | | |
-| 1 | Ok | | | |
-| 2 | Ok | | | |
-| 3 | NEX-07 | | | |
-| 4 | Ok | | | |
-| 5 | Ok | | | |
-| 6 | Ok | | | |
-| 7 | Ok | | | |
-| 8 | Ok | | | |
-| 9 | Ok | | | |
-| 10 | NEX-05 | | | |
-| 11 | Ok | | | |
-| 12 | Ok | | | |
+Open issues are tracked [here](https://github.com../../../../../issues):
+
+## Batch / Synthetic / Local
+
+| Query | Direct | Spark | Flink | Apex |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------- |
+| 0 | ok | [#1](../../../../../issues/1) | ok | ok |
+| 1 | ok | [#1](../../../../../issues/1) | ok | ok |
+| 2 | ok | [#1](../../../../../issues/1) | ok | ok |
+| 3 | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) | [#7](../../../../../issues/7) |
+| 4 | ok | ok | [#2](../../../../../issues/2) | ok |
+| 5 | ok | [#3](../../../../../issues/3) | ok | ok |
+| 6 | ok | ok | [#2](../../../../../issues/2) | ok |
+| 7 | ok | [#1](../../../../../issues/1) | ok | [#24](../../../../../issues/24) |
+| 8 | ok | [#1](../../../../../issues/1) | ok | ok |
+| 9 | ok | ok | [#2](../../../../../issues/2) | ok |
+| 10 | [#5](../../../../../issues/5) | [#4](../../../../../issues/4) | ok | ok |
+| 11 | ok | [#1](../../../../../issues/1) | ok | ok |
+| 12 | ok | [#1](../../../../../issues/1) | ok | ok |
+
+## Streaming / Synthetic / Local
+
+| Query | Direct | Spark | Flink | Apex |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+| 0 | ok | | | ok |
+| 1 | ok | | | ok |
+| 2 | ok | | | ok |
+| 3 | [#7](../../../../../issues/7) | | | [#7](../../../../../issues/7) |
+| 4 | ok | | | ok |
+| 5 | ok | | | ok |
+| 6 | ok | | | ok |
+| 7 | ok | | | ? |
+| 8 | ok | | | ok |
+| 9 | ok | | | ok |
+| 10 | [#5](../../../../../issues/5) | | | ? |
+| 11 | ok | | | Ok |
+| 12 | ok | | | Ok |
+
+## Batch / Synthetic / Cluster
+
+TODO
+
+| Query | Dataflow | Spark | Flink | Apex |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+| 0 | | | | |
+
+## Streaming / Synthetic / Cluster
+
+TODO
+
+| Query | Dataflow | Spark | Flink | Apex |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+| 0 | | | | |
# Running Nexmark
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 27abb0e..0ecc298 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -28,7 +28,7 @@
</parent>
<artifactId>beam-integration-java</artifactId>
- <name>Apache Beam :: Integration Tests :: Java All</name>
+ <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name>
<packaging>jar</packaging>
@@ -37,6 +37,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.2.0</flink.version>
<spark.version>1.6.3</spark.version>
+ <apex.codehaus.jackson.version>1.9.3</apex.codehaus.jackson.version>
<skipITs>true</skipITs>
</properties>
@@ -207,6 +208,30 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-apex</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>${apex.kryo.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${apex.codehaus.jackson.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>${apex.codehaus.jackson.version}</version>
+ <scope>runtime</scope>
+ </dependency>
<!-- IOs -->
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
index ea46082..3b8993a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
@@ -39,11 +39,6 @@ public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkAp
}
@Override
- protected boolean canMonitor() {
- return false;
- }
-
- @Override
protected void invokeBuilderForPublishOnlyPipeline(
PipelineBuilder builder) {
builder.build(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
index c70e41e..0119bbc 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
@@ -41,11 +41,6 @@ class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirec
}
@Override
- protected boolean canMonitor() {
- return true;
- }
-
- @Override
protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
throw new UnsupportedOperationException(
"Cannot use --pubSubMode=COMBINED with DirectRunner");
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
index 8e22917..95ab1ad 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
@@ -37,11 +37,6 @@ public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.Nexmark
}
@Override
- protected boolean canMonitor() {
- return true;
- }
-
- @Override
protected void invokeBuilderForPublishOnlyPipeline(
PipelineBuilder builder) {
builder.build(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
index 135d428..f4bfb1e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
@@ -60,11 +60,6 @@ class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogl
}
@Override
- protected boolean canMonitor() {
- return true;
- }
-
- @Override
protected String getJobId(PipelineResult job) {
return ((DataflowPipelineJob) job).getJobId();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index 8d4c1f1..d311dc4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -158,11 +158,6 @@ public abstract class NexmarkRunner<OptionT extends Options> {
protected abstract int maxNumWorkers();
/**
- * Return true if runner can monitor running jobs.
- */
- protected abstract boolean canMonitor();
-
- /**
* Return the current value for a long counter, or -1 if can't be retrieved.
*/
protected long getLong(PipelineResult job, Aggregator<Long, Long> aggregator) {
@@ -1089,10 +1084,6 @@ public abstract class NexmarkRunner<OptionT extends Options> {
*/
@Nullable
public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
- if (options.getMonitorJobs() && !canMonitor()) {
- throw new RuntimeException("Cannot use --monitorJobs with this runner since it does not "
- + "support monitoring.");
- }
if (options.getManageResources() && !options.getMonitorJobs()) {
throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
index 32fee30..30ae9ca 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
@@ -37,11 +37,6 @@ public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.Nexmark
}
@Override
- protected boolean canMonitor() {
- return true;
- }
-
- @Override
protected void invokeBuilderForPublishOnlyPipeline(
PipelineBuilder builder) {
builder.build(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
index 9020494..2c9fb9b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
@@ -67,7 +67,6 @@ class Query5 extends NexmarkQuery {
// Count the number of bids per auction id.
.apply(Count.<Long>perElement())
- //TODO replace by simple key
// We'll want to keep all auctions with the maximal number of bids.
// Start by lifting each into a singleton list.
.apply(name + ".ToSingletons",
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/resources/log4j.properties b/integration/java/nexmark/src/main/resources/log4j.properties
index 9d20aea..bc09794 100644
--- a/integration/java/nexmark/src/main/resources/log4j.properties
+++ b/integration/java/nexmark/src/main/resources/log4j.properties
@@ -35,8 +35,17 @@ log4j.logger.org.apache.spark=WARN
log4j.logger.org.spark-project=WARN
log4j.logger.io.netty=INFO
+# Settings to quiet flink logs
log4j.logger.org.apache.flink=WARN
+# Settings to quiet apex logs
+log4j.logger.org.apache.beam.runners.apex=INFO
+log4j.logger.com.datatorrent=ERROR
+log4j.logger.org.apache.hadoop.metrics2=WARN
+log4j.logger.org.apache.commons=WARN
+log4j.logger.org.apache.hadoop.security=WARN
+log4j.logger.org.apache.hadoop.util=WARN
+
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
index 02761d6..35b3aed 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
@@ -87,8 +87,8 @@ public class UnboundedEventSourceTest {
Generator modelGenerator = new Generator(config);
EventIdChecker checker = new EventIdChecker();
- Pipeline p = TestPipeline.create();
- PipelineOptions options = p.getOptions();
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ Pipeline p = TestPipeline.create(options);
UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false);
UnboundedReader<Event> reader = source.createReader(options, null);