You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:25 UTC

[17/55] [abbrv] beam git commit: Fix Apex driver and update execution matrix

Fix Apex driver and update execution matrix


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9ce9bf07
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9ce9bf07
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9ce9bf07

Branch: refs/heads/master
Commit: 9ce9bf076032e1c9aeb3a6dce806ad4b96127157
Parents: 1bd5735
Author: Ismaël Mejía <ie...@apache.org>
Authored: Tue Mar 21 18:29:20 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 integration/java/nexmark/README.md              | 109 +++++++++++--------
 integration/java/nexmark/pom.xml                |  27 ++++-
 .../integration/nexmark/NexmarkApexRunner.java  |   5 -
 .../nexmark/NexmarkDirectRunner.java            |   5 -
 .../integration/nexmark/NexmarkFlinkRunner.java |   5 -
 .../nexmark/NexmarkGoogleRunner.java            |   5 -
 .../beam/integration/nexmark/NexmarkRunner.java |   9 --
 .../integration/nexmark/NexmarkSparkRunner.java |   5 -
 .../apache/beam/integration/nexmark/Query5.java |   1 -
 .../nexmark/src/main/resources/log4j.properties |   9 ++
 .../nexmark/UnboundedEventSourceTest.java       |   4 +-
 11 files changed, 100 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/README.md
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/README.md b/integration/java/nexmark/README.md
index 4c08c28..7a91ab2 100644
--- a/integration/java/nexmark/README.md
+++ b/integration/java/nexmark/README.md
@@ -122,63 +122,80 @@ Number of events generators
 
     --numEventGenerators=4
 
-## Flink specific configuration
+## Apex specific configuration
 
---suite=SMOKE --manageResources=false --monitorJobs=false \
---flinkMaster=local
+--suite=SMOKE --manageResources=false --monitorJobs=true
 
 ## Direct specific configuration
 
---suite=SMOKE --manageResources=false --monitorJobs=false \
+--suite=SMOKE --manageResources=false --monitorJobs=true \
 --enforceEncodability=false --enforceImmutability=false
 
+## Flink specific configuration
+
+--suite=SMOKE --manageResources=false --monitorJobs=true \
+--flinkMaster=local
+
 ## Spark specific configuration
 
---suite=SMOKE
---manageResources=false --monitorJobs=false --sparkMaster=local
--Dspark.ui.enabled=false
--DSPARK_LOCAL_IP=localhost
--Dsun.io.serialization.extendedDebugInfo=true
+--suite=SMOKE --manageResources=false --monitorJobs=true --sparkMaster=local \
+-Dspark.ui.enabled=false -DSPARK_LOCAL_IP=localhost -Dsun.io.serialization.extendedDebugInfo=true
 
 # Current Status
 
-Open issues are currently opened on [github](https://github.com/iemejia/beam/issues):
-
-## Batch Mode / Synthetic / Local
-
-| Query | Direct | Spark  | Flink  | Apex   |
-| ----: | ------ | ------ | ------ | ------ |
-|     0 | Ok     |   #1   | Ok     |        |
-|     1 | Ok     |   #1   | Ok     |        |
-|     2 | Ok     | NEX-01 | Ok     |        |
-|     3 | NEX-07 | NEX-07 | NEX-07 |        |
-|     4 | Ok     | Ok     | NEX-02 |        |
-|     5 | Ok     | NEX-03 | Ok     |        |
-|     6 | Ok     | OK     | NEX-02 |        |
-|     7 | Ok     | NEX-01 | Ok     |        |
-|     8 | Ok     | NEX-01 | Ok     |        |
-|     9 | Ok     | OK     | NEX-02 |        |
-|    10 | NEX-05 | NEX-04 | Ok     |        |
-|    11 | Ok     | NEX-01 | Ok     |        |
-|    12 | Ok     | NEX-01 | Ok     |        |
-
-## Streaming Mode / Synthetic / Local
-
-| Query | Direct | Spark  | Flink  | Apex   |
-| ----: | ------ | ------ | ------ | ------ |
-|     0 | Ok     |        |        |        |
-|     1 | Ok     |        |        |        |
-|     2 | Ok     |        |        |        |
-|     3 | NEX-07 |        |        |        |
-|     4 | Ok     |        |        |        |
-|     5 | Ok     |        |        |        |
-|     6 | Ok     |        |        |        |
-|     7 | Ok     |        |        |        |
-|     8 | Ok     |        |        |        |
-|     9 | Ok     |        |        |        |
-|    10 | NEX-05 |        |        |        |
-|    11 | Ok     |        |        |        |
-|    12 | Ok     |        |        |        |
+Open issues are tracked [here](https://github.com../../../../../issues):
+
+## Batch / Synthetic / Local
+
+| Query | Direct                         | Spark                          | Flink                          | Apex                            |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------- |
+|     0 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
+|     1 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
+|     2 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
+|     3 | [#7](../../../../../issues/7)  | [#7](../../../../../issues/7)  | [#7](../../../../../issues/7)  | [#7](../../../../../issues/7)   |
+|     4 | ok                             | ok                             | [#2](../../../../../issues/2)  | ok                              |
+|     5 | ok                             | [#3](../../../../../issues/3)  | ok                             | ok                              |
+|     6 | ok                             | ok                             | [#2](../../../../../issues/2)  | ok                              |
+|     7 | ok                             | [#1](../../../../../issues/1)  | ok                             | [#24](../../../../../issues/24) |
+|     8 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
+|     9 | ok                             | ok                             | [#2](../../../../../issues/2)  | ok                              |
+|    10 | [#5](../../../../../issues/5)  | [#4](../../../../../issues/4)  | ok                             | ok                              |
+|    11 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
+|    12 | ok                             | [#1](../../../../../issues/1)  | ok                             | ok                              |
+
+## Streaming / Synthetic / Local
+
+| Query | Direct                         | Spark                          | Flink                          | Apex                           |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+|     0 | ok                             |                                |                                | ok                             |
+|     1 | ok                             |                                |                                | ok                             |
+|     2 | ok                             |                                |                                | ok                             |
+|     3 | [#7](../../../../../issues/7)  |                                |                                | [#7](../../../../../issues/7)  |
+|     4 | ok                             |                                |                                | ok                             |
+|     5 | ok                             |                                |                                | ok                             |
+|     6 | ok                             |                                |                                | ok                             |
+|     7 | ok                             |                                |                                | ?                              |
+|     8 | ok                             |                                |                                | ok                             |
+|     9 | ok                             |                                |                                | ok                             |
+|    10 | [#5](../../../../../issues/5)  |                                |                                | ?                              |
+|    11 | ok                             |                                |                                | Ok                             |
+|    12 | ok                             |                                |                                | Ok                             |
+
+## Batch / Synthetic / Cluster
+
+TODO
+
+| Query | Dataflow                       | Spark                          | Flink                          | Apex                           |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+|     0 |                                |                                |                                |                                |
+
+## Streaming / Synthetic / Cluster
+
+TODO
+
+| Query | Dataflow                       | Spark                          | Flink                          | Apex                           |
+| ----: | ------------------------------ | ------------------------------ | ------------------------------ | ------------------------------ |
+|     0 |                                |                                |                                |                                |
 
 # Running Nexmark
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 27abb0e..0ecc298 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -28,7 +28,7 @@
   </parent>
 
   <artifactId>beam-integration-java</artifactId>
-  <name>Apache Beam :: Integration Tests :: Java All</name>
+  <name>Apache Beam :: Integration Tests :: Java :: Nexmark</name>
 
   <packaging>jar</packaging>
 
@@ -37,6 +37,7 @@
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <flink.version>1.2.0</flink.version>
     <spark.version>1.6.3</spark.version>
+    <apex.codehaus.jackson.version>1.9.3</apex.codehaus.jackson.version>
     <skipITs>true</skipITs>
   </properties>
 
@@ -207,6 +208,30 @@
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-apex</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>${apex.kryo.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>${apex.codehaus.jackson.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <version>${apex.codehaus.jackson.version}</version>
+      <scope>runtime</scope>
+    </dependency>
 
     <!-- IOs -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
index ea46082..3b8993a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
@@ -39,11 +39,6 @@ public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkAp
   }
 
   @Override
-  protected boolean canMonitor() {
-    return false;
-  }
-
-  @Override
   protected void invokeBuilderForPublishOnlyPipeline(
       PipelineBuilder builder) {
     builder.build(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
index c70e41e..0119bbc 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
@@ -41,11 +41,6 @@ class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirec
   }
 
   @Override
-  protected boolean canMonitor() {
-    return true;
-  }
-
-  @Override
   protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
     throw new UnsupportedOperationException(
         "Cannot use --pubSubMode=COMBINED with DirectRunner");

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
index 8e22917..95ab1ad 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
@@ -37,11 +37,6 @@ public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.Nexmark
   }
 
   @Override
-  protected boolean canMonitor() {
-    return true;
-  }
-
-  @Override
   protected void invokeBuilderForPublishOnlyPipeline(
       PipelineBuilder builder) {
     builder.build(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
index 135d428..f4bfb1e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
@@ -60,11 +60,6 @@ class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogl
   }
 
   @Override
-  protected boolean canMonitor() {
-    return true;
-  }
-
-  @Override
   protected String getJobId(PipelineResult job) {
     return ((DataflowPipelineJob) job).getJobId();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index 8d4c1f1..d311dc4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -158,11 +158,6 @@ public abstract class NexmarkRunner<OptionT extends Options> {
   protected abstract int maxNumWorkers();
 
   /**
-   * Return true if runner can monitor running jobs.
-   */
-  protected abstract boolean canMonitor();
-
-  /**
    * Return the current value for a long counter, or -1 if can't be retrieved.
    */
   protected long getLong(PipelineResult job, Aggregator<Long, Long> aggregator) {
@@ -1089,10 +1084,6 @@ public abstract class NexmarkRunner<OptionT extends Options> {
    */
   @Nullable
   public NexmarkPerf run(NexmarkConfiguration runConfiguration) {
-    if (options.getMonitorJobs() && !canMonitor()) {
-      throw new RuntimeException("Cannot use --monitorJobs with this runner since it does not "
-                                 + "support monitoring.");
-    }
     if (options.getManageResources() && !options.getMonitorJobs()) {
       throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
index 32fee30..30ae9ca 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
@@ -37,11 +37,6 @@ public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.Nexmark
     }
 
     @Override
-    protected boolean canMonitor() {
-        return true;
-    }
-
-    @Override
     protected void invokeBuilderForPublishOnlyPipeline(
             PipelineBuilder builder) {
         builder.build(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
index 9020494..2c9fb9b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
@@ -67,7 +67,6 @@ class Query5 extends NexmarkQuery {
         // Count the number of bids per auction id.
         .apply(Count.<Long>perElement())
 
-      //TODO replace by simple key
       // We'll want to keep all auctions with the maximal number of bids.
         // Start by lifting each into a singleton list.
         .apply(name + ".ToSingletons",

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/resources/log4j.properties b/integration/java/nexmark/src/main/resources/log4j.properties
index 9d20aea..bc09794 100644
--- a/integration/java/nexmark/src/main/resources/log4j.properties
+++ b/integration/java/nexmark/src/main/resources/log4j.properties
@@ -35,8 +35,17 @@ log4j.logger.org.apache.spark=WARN
 log4j.logger.org.spark-project=WARN
 log4j.logger.io.netty=INFO
 
+# Settings to quiet flink logs
 log4j.logger.org.apache.flink=WARN
 
+# Settings to quiet apex logs
+log4j.logger.org.apache.beam.runners.apex=INFO
+log4j.logger.com.datatorrent=ERROR
+log4j.logger.org.apache.hadoop.metrics2=WARN
+log4j.logger.org.apache.commons=WARN
+log4j.logger.org.apache.hadoop.security=WARN
+log4j.logger.org.apache.hadoop.util=WARN
+
 # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
 log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
 log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

http://git-wip-us.apache.org/repos/asf/beam/blob/9ce9bf07/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
index 02761d6..35b3aed 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/UnboundedEventSourceTest.java
@@ -87,8 +87,8 @@ public class UnboundedEventSourceTest {
     Generator modelGenerator = new Generator(config);
 
     EventIdChecker checker = new EventIdChecker();
-    Pipeline p = TestPipeline.create();
-    PipelineOptions options = p.getOptions();
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    Pipeline p = TestPipeline.create(options);
     UnboundedEventSource source = new UnboundedEventSource(config, 1, 0, false);
     UnboundedReader<Event> reader = source.createReader(options, null);