You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:27 UTC

[19/55] [abbrv] beam git commit: Activate monitoring on NexmarkSparkRunner and on specific runners

Activate monitoring on NexmarkSparkRunner and on specific runners

issue #28

Fix compilation issue after rebase + make checkstyle happy again


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a1fe33bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a1fe33bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a1fe33bc

Branch: refs/heads/master
Commit: a1fe33bc122b26960697c87620ca0dc2ed522e56
Parents: a095e40
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Wed Mar 15 15:25:41 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200

----------------------------------------------------------------------
 integration/java/nexmark/pom.xml                | 32 ++++++++++----------
 .../integration/nexmark/NexmarkApexRunner.java  |  2 --
 .../nexmark/NexmarkDirectRunner.java            |  5 ---
 .../integration/nexmark/NexmarkFlinkRunner.java | 12 +-------
 .../nexmark/NexmarkGoogleDriver.java            |  2 --
 .../nexmark/NexmarkGoogleRunner.java            |  2 +-
 .../beam/integration/nexmark/NexmarkRunner.java | 13 ++++----
 .../integration/nexmark/NexmarkSparkDriver.java |  4 +--
 .../integration/nexmark/NexmarkSparkRunner.java | 11 +------
 .../beam/integration/nexmark/NexmarkUtils.java  |  3 +-
 .../apache/beam/integration/nexmark/Query5.java |  3 +-
 11 files changed, 31 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 07d14c2..febd96d 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -179,28 +179,28 @@
       <artifactId>beam-runners-flink_2.10</artifactId>
     </dependency>
 
-    <!--<dependency>-->
-      <!--<groupId>org.apache.flink</groupId>-->
-      <!--<artifactId>flink-shaded-hadoop2</artifactId>-->
-      <!--<version>${flink.version}</version>-->
-      <!--<scope>provided</scope>-->
-    <!--</dependency>-->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-shaded-hadoop2</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
 
     <!-- Spark runner -->
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-spark</artifactId>
     </dependency>
-    <!--<dependency>-->
-      <!--<groupId>org.apache.spark</groupId>-->
-      <!--<artifactId>spark-core_2.10</artifactId>-->
-      <!--<version>${spark.version}</version>-->
-    <!--</dependency>-->
-    <!--<dependency>-->
-      <!--<groupId>org.apache.spark</groupId>-->
-      <!--<artifactId>spark-streaming_2.10</artifactId>-->
-      <!--<version>${spark.version}</version>-->
-    <!--</dependency>-->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_2.10</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_2.10</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
 
     <!-- Apex runner -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
index f2da1c7..ea46082 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
@@ -18,8 +18,6 @@
 package org.apache.beam.integration.nexmark;
 
 import javax.annotation.Nullable;
-import org.apache.beam.runners.apex.ApexRunnerResult;
-import org.apache.beam.sdk.PipelineResult;
 
 /**
  * Run a query using the Apex runner.

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
index ee234b1..c70e41e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
@@ -17,11 +17,6 @@
  */
 package org.apache.beam.integration.nexmark;
 
-import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.sdk.PipelineResult;
-
 /**
  * Run a single query using the Direct Runner.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
index a8b4401..8e22917 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.integration.nexmark;
 
-import javax.annotation.Nullable;
-import org.apache.beam.runners.flink.FlinkRunnerResult;
-import org.apache.beam.sdk.PipelineResult;
-
 /**
  * Run a query using the Flink runner.
  */
@@ -42,7 +38,7 @@ public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.Nexmark
 
   @Override
   protected boolean canMonitor() {
-    return false;
+    return true;
   }
 
   @Override
@@ -56,12 +52,6 @@ public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.Nexmark
     throw new UnsupportedOperationException();
   }
 
-  @Override
-  @Nullable
-  protected NexmarkPerf monitor(NexmarkQuery query) {
-    return null;
-  }
-
   public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
     super(options);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
index 67c4aeb..50c2a7c 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
@@ -17,10 +17,8 @@
  */
 package org.apache.beam.integration.nexmark;
 
-import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
index c78bb42..135d428 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
@@ -66,7 +66,7 @@ class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogl
 
   @Override
   protected String getJobId(PipelineResult job) {
-    return ((DataflowPipelineJob)job).getJobId();
+    return ((DataflowPipelineJob) job).getJobId();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index 5365dbe..8d4c1f1 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -53,8 +53,6 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
 
-import static org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
-
 /**
  * Run a single Nexmark query using a given configuration.
  */
@@ -203,7 +201,8 @@ public abstract class NexmarkRunner<OptionT extends Options> {
    * Find a 'steady state' events/sec from {@code snapshots} and
    * store it in {@code perf} if found.
    */
-  protected void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
+  protected void captureSteadyState(NexmarkPerf perf,
+                                    List<NexmarkPerf.ProgressSnapshot> snapshots) {
     if (!options.isStreaming()) {
       return;
     }
@@ -365,7 +364,9 @@ public abstract class NexmarkRunner<OptionT extends Options> {
     return perf;
   }
 
-  String getJobId(PipelineResult job){return "";}
+  String getJobId(PipelineResult job) {
+    return "";
+  }
 
   // TODO specific to dataflow, see if we can find an equivalent
 /*
@@ -926,8 +927,8 @@ public abstract class NexmarkRunner<OptionT extends Options> {
                                       new TableFieldSchema().setName("index").setType("INTEGER"),
                                       new TableFieldSchema().setName("value").setType("STRING")))));
     NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
-    BigQueryIO.Write.Bound io =
-        BigQueryIO.Write.to(tableSpec)
+    BigQueryIO.Write io =
+        BigQueryIO.write().to(tableSpec)
                         .withSchema(tableSchema)
                         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
index 1ea963d..a46d38a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
@@ -18,7 +18,6 @@
 package org.apache.beam.integration.nexmark;
 
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 
 /**
@@ -39,7 +38,8 @@ class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOp
                 PipelineOptionsFactory.fromArgs(args)
                         .withValidation()
                         .as(NexmarkSparkOptions.class);
-        options.setRunner(SparkRunner.class);
+//        options.setRunner(org.apache.beam.runners.spark.SparkRunner.class);
+        options.setRunner(org.apache.beam.runners.spark.SparkRunnerDebugger.class);
         NexmarkSparkRunner runner = new NexmarkSparkRunner(options);
         new NexmarkSparkDriver().runAll(options, runner);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
index 109e8a0..32fee30 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.integration.nexmark;
 
-import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.SparkPipelineResult;
-import org.apache.beam.sdk.PipelineResult;
-
 /**
  * Run a query using the Spark runner.
  */
@@ -42,7 +38,7 @@ public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.Nexmark
 
     @Override
     protected boolean canMonitor() {
-        return false;
+        return true;
     }
 
     @Override
@@ -56,11 +52,6 @@ public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.Nexmark
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    @Nullable
-    protected NexmarkPerf monitor(NexmarkQuery query) {
-        return null;
-    }
 
     public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) {
         super(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index 6588f85..8f4cb22 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -325,8 +324,8 @@ public class NexmarkUtils {
    * Setup pipeline with codes and some other options.
    */
   public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
-    PipelineRunner<?> runner = p.getRunner();
     //TODO Ismael check
+//    PipelineRunner<?> runner = p.getRunner();
 //    if (runner instanceof DirectRunner) {
 //      // Disable randomization of output since we want to check batch and streaming match the
 //      // model both locally and on the cloud.

http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
index 7001986..9020494 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
@@ -67,7 +67,8 @@ class Query5 extends NexmarkQuery {
         // Count the number of bids per auction id.
         .apply(Count.<Long>perElement())
 
-        // We'll want to keep all auctions with the maximal number of bids.
+      //TODO replace by simple key
+      // We'll want to keep all auctions with the maximal number of bids.
         // Start by lifting each into a singleton list.
         .apply(name + ".ToSingletons",
             ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {