You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:27 UTC
[19/55] [abbrv] beam git commit: Activate monitoring on
NexmarkSparkRunner and on specific runners
Activate monitoring on NexmarkSparkRunner and on specific runners
issue #28
Fix compilation issue after rebase + make checkstyle happy again
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a1fe33bc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a1fe33bc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a1fe33bc
Branch: refs/heads/master
Commit: a1fe33bc122b26960697c87620ca0dc2ed522e56
Parents: a095e40
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Wed Mar 15 15:25:41 2017 +0100
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:27 2017 +0200
----------------------------------------------------------------------
integration/java/nexmark/pom.xml | 32 ++++++++++----------
.../integration/nexmark/NexmarkApexRunner.java | 2 --
.../nexmark/NexmarkDirectRunner.java | 5 ---
.../integration/nexmark/NexmarkFlinkRunner.java | 12 +-------
.../nexmark/NexmarkGoogleDriver.java | 2 --
.../nexmark/NexmarkGoogleRunner.java | 2 +-
.../beam/integration/nexmark/NexmarkRunner.java | 13 ++++----
.../integration/nexmark/NexmarkSparkDriver.java | 4 +--
.../integration/nexmark/NexmarkSparkRunner.java | 11 +------
.../beam/integration/nexmark/NexmarkUtils.java | 3 +-
.../apache/beam/integration/nexmark/Query5.java | 3 +-
11 files changed, 31 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 07d14c2..febd96d 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -179,28 +179,28 @@
<artifactId>beam-runners-flink_2.10</artifactId>
</dependency>
- <!--<dependency>-->
- <!--<groupId>org.apache.flink</groupId>-->
- <!--<artifactId>flink-shaded-hadoop2</artifactId>-->
- <!--<version>${flink.version}</version>-->
- <!--<scope>provided</scope>-->
- <!--</dependency>-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop2</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Spark runner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
</dependency>
- <!--<dependency>-->
- <!--<groupId>org.apache.spark</groupId>-->
- <!--<artifactId>spark-core_2.10</artifactId>-->
- <!--<version>${spark.version}</version>-->
- <!--</dependency>-->
- <!--<dependency>-->
- <!--<groupId>org.apache.spark</groupId>-->
- <!--<artifactId>spark-streaming_2.10</artifactId>-->
- <!--<version>${spark.version}</version>-->
- <!--</dependency>-->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
<!-- Apex runner -->
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
index f2da1c7..ea46082 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkApexRunner.java
@@ -18,8 +18,6 @@
package org.apache.beam.integration.nexmark;
import javax.annotation.Nullable;
-import org.apache.beam.runners.apex.ApexRunnerResult;
-import org.apache.beam.sdk.PipelineResult;
/**
* Run a query using the Apex runner.
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
index ee234b1..c70e41e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkDirectRunner.java
@@ -17,11 +17,6 @@
*/
package org.apache.beam.integration.nexmark;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.direct.DirectRunner;
-import org.apache.beam.sdk.PipelineResult;
-
/**
* Run a single query using the Direct Runner.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
index a8b4401..8e22917 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java
@@ -17,10 +17,6 @@
*/
package org.apache.beam.integration.nexmark;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.flink.FlinkRunnerResult;
-import org.apache.beam.sdk.PipelineResult;
-
/**
* Run a query using the Flink runner.
*/
@@ -42,7 +38,7 @@ public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.Nexmark
@Override
protected boolean canMonitor() {
- return false;
+ return true;
}
@Override
@@ -56,12 +52,6 @@ public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.Nexmark
throw new UnsupportedOperationException();
}
- @Override
- @Nullable
- protected NexmarkPerf monitor(NexmarkQuery query) {
- return null;
- }
-
public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
super(options);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
index 67c4aeb..50c2a7c 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleDriver.java
@@ -17,10 +17,8 @@
*/
package org.apache.beam.integration.nexmark;
-import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
index c78bb42..135d428 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkGoogleRunner.java
@@ -66,7 +66,7 @@ class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogl
@Override
protected String getJobId(PipelineResult job) {
- return ((DataflowPipelineJob)job).getJobId();
+ return ((DataflowPipelineJob) job).getJobId();
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index 5365dbe..8d4c1f1 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -53,8 +53,6 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
-import static org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
-
/**
* Run a single Nexmark query using a given configuration.
*/
@@ -203,7 +201,8 @@ public abstract class NexmarkRunner<OptionT extends Options> {
* Find a 'steady state' events/sec from {@code snapshots} and
* store it in {@code perf} if found.
*/
- protected void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
+ protected void captureSteadyState(NexmarkPerf perf,
+ List<NexmarkPerf.ProgressSnapshot> snapshots) {
if (!options.isStreaming()) {
return;
}
@@ -365,7 +364,9 @@ public abstract class NexmarkRunner<OptionT extends Options> {
return perf;
}
- String getJobId(PipelineResult job){return "";}
+ String getJobId(PipelineResult job) {
+ return "";
+ }
// TODO specific to dataflow, see if we can find an equivalent
/*
@@ -926,8 +927,8 @@ public abstract class NexmarkRunner<OptionT extends Options> {
new TableFieldSchema().setName("index").setType("INTEGER"),
new TableFieldSchema().setName("value").setType("STRING")))));
NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
- BigQueryIO.Write.Bound io =
- BigQueryIO.Write.to(tableSpec)
+ BigQueryIO.Write io =
+ BigQueryIO.write().to(tableSpec)
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
index 1ea963d..a46d38a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkDriver.java
@@ -18,7 +18,6 @@
package org.apache.beam.integration.nexmark;
import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
/**
@@ -39,7 +38,8 @@ class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOp
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(NexmarkSparkOptions.class);
- options.setRunner(SparkRunner.class);
+// options.setRunner(org.apache.beam.runners.spark.SparkRunner.class);
+ options.setRunner(org.apache.beam.runners.spark.SparkRunnerDebugger.class);
NexmarkSparkRunner runner = new NexmarkSparkRunner(options);
new NexmarkSparkDriver().runAll(options, runner);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
index 109e8a0..32fee30 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkSparkRunner.java
@@ -17,10 +17,6 @@
*/
package org.apache.beam.integration.nexmark;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.spark.SparkPipelineResult;
-import org.apache.beam.sdk.PipelineResult;
-
/**
* Run a query using the Spark runner.
*/
@@ -42,7 +38,7 @@ public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.Nexmark
@Override
protected boolean canMonitor() {
- return false;
+ return true;
}
@Override
@@ -56,11 +52,6 @@ public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.Nexmark
throw new UnsupportedOperationException();
}
- @Override
- @Nullable
- protected NexmarkPerf monitor(NexmarkQuery query) {
- return null;
- }
public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) {
super(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index 6588f85..8f4cb22 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
@@ -325,8 +324,8 @@ public class NexmarkUtils {
* Setup pipeline with codes and some other options.
*/
public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
- PipelineRunner<?> runner = p.getRunner();
//TODO Ismael check
+// PipelineRunner<?> runner = p.getRunner();
// if (runner instanceof DirectRunner) {
// // Disable randomization of output since we want to check batch and streaming match the
// // model both locally and on the cloud.
http://git-wip-us.apache.org/repos/asf/beam/blob/a1fe33bc/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
index 7001986..9020494 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
@@ -67,7 +67,8 @@ class Query5 extends NexmarkQuery {
// Count the number of bids per auction id.
.apply(Count.<Long>perElement())
- // We'll want to keep all auctions with the maximal number of bids.
+ //TODO replace by simple key
+ // We'll want to keep all auctions with the maximal number of bids.
// Start by lifting each into a singleton list.
.apply(name + ".ToSingletons",
ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {