You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:42 UTC
[34/55] [abbrv] beam git commit: Clean some code that is specific to
Dataflow
Clean some code that is specific to Dataflow
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/77eabbaa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/77eabbaa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/77eabbaa
Branch: refs/heads/master
Commit: 77eabbaaddad88784c8ce2e775b4b8e8fea3f868
Parents: 902050b
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Fri May 5 15:19:07 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200
----------------------------------------------------------------------
.../beam/integration/nexmark/NexmarkRunner.java | 106 -------------------
1 file changed, 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/77eabbaa/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index 6df76f0..935544e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -157,9 +157,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
this.options = options;
}
- // ================================================================================
- // Overridden by each runner.
- // ================================================================================
/**
* Is this query running in streaming mode?
@@ -414,7 +411,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
}
- perf.jobId = getJobId(result);
// As soon as available, try to capture cumulative cost at this point too.
NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
@@ -429,105 +425,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
return perf;
}
- private String getJobId(PipelineResult job) {
- return "";
- }
-
- // TODO specific to dataflow, see if we can find an equivalent
-/*
- protected MetricType getMetricType(MetricUpdate metric) {
- String metricName = metric.getKey().metricName().name();
- if (metricName.endsWith("windmill-system-watermark")) {
- return MetricType.SYSTEM_WATERMARK;
- } else if (metricName.endsWith("windmill-data-watermark")) {
- return MetricType.DATA_WATERMARK;
- } else {
- return MetricType.OTHER;
- }
- }
-*/
-
- /**
- * Check that watermarks are not too far behind.
- *
- * <p>Returns a list of errors detected.
- */
- // TODO specific to dataflow, see if we can find an equivalent
- /*
- private List<String> checkWatermarks(DataflowPipelineJob job, long startMsSinceEpoch) {
- long now = System.currentTimeMillis();
- List<String> errors = new ArrayList<>();
- try {
- JobMetrics metricResponse = job.getDataflowClient()
- .projects()
- .jobs()
- .getMetrics(job.getProjectId(), job.getJobId())
- .execute();
- List<MetricUpdate> metrics = metricResponse.getMetrics();
-
-
-
- if (metrics != null) {
- boolean foundWatermarks = false;
- for (MetricUpdate metric : metrics) {
- MetricType type = getMetricType(metric);
- if (type == MetricType.OTHER) {
- continue;
- }
- foundWatermarks = true;
- @SuppressWarnings("unchecked")
- BigDecimal scalar = (BigDecimal) metric.getScalar();
- if (scalar.signum() < 0) {
- continue;
- }
- Instant value =
- new Instant(scalar.divideToIntegralValue(new BigDecimal(1000)).longValueExact());
- Instant updateTime = Instant.parse(metric.getUpdateTime());
-
- if (options.getWatermarkValidationDelaySeconds() == null
- || now > startMsSinceEpoch
- + Duration.standardSeconds(options.getWatermarkValidationDelaySeconds())
- .getMillis()) {
- Duration threshold = null;
- if (type == MetricType.SYSTEM_WATERMARK && options.getMaxSystemLagSeconds() != null) {
- threshold = Duration.standardSeconds(options.getMaxSystemLagSeconds());
- } else if (type == MetricType.DATA_WATERMARK
- && options.getMaxDataLagSeconds() != null) {
- threshold = Duration.standardSeconds(options.getMaxDataLagSeconds());
- }
-
- if (threshold != null && value.isBefore(updateTime.minus(threshold))) {
- String msg = String.format("High lag for %s: %s vs %s (allowed lag of %s)",
- metric.getKey().metricName().name(), value, updateTime, threshold);
- errors.add(msg);
- NexmarkUtils.console(msg);
- }
- }
- }
- if (!foundWatermarks) {
- NexmarkUtils.console("No known watermarks in update: " + metrics);
- if (now > startMsSinceEpoch + Duration.standardMinutes(5).getMillis()) {
- errors.add("No known watermarks found. Metrics were " + metrics);
- }
- }
- }
- } catch (IOException e) {
- NexmarkUtils.console("Warning: failed to get JobMetrics: " + e);
- }
-
- return errors;
- }
-*/
-
- // TODO specific to dataflow, see if we can find an equivalent
-/*
- enum MetricType {
- SYSTEM_WATERMARK,
- DATA_WATERMARK,
- OTHER
- }
-*/
-
/**
* Build and run a pipeline using specified options.
*/
@@ -643,9 +540,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes()));
}
- // TODO specific to dataflow, see if we can find an equivalent
-// errors.addAll(checkWatermarks(job, startMsSinceEpoch));
-
if (waitingForShutdown) {
try {
job.cancel();