You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:42 UTC

[34/55] [abbrv] beam git commit: Clean some code that is specific to Dataflow

Clean some code that is specific to Dataflow


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/77eabbaa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/77eabbaa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/77eabbaa

Branch: refs/heads/master
Commit: 77eabbaaddad88784c8ce2e775b4b8e8fea3f868
Parents: 902050b
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Fri May 5 15:19:07 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200

----------------------------------------------------------------------
 .../beam/integration/nexmark/NexmarkRunner.java | 106 -------------------
 1 file changed, 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/77eabbaa/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index 6df76f0..935544e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -157,9 +157,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
     this.options = options;
   }
 
-  // ================================================================================
-  // Overridden by each runner.
-  // ================================================================================
 
   /**
    * Is this query running in streaming mode?
@@ -414,7 +411,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
       perf.shutdownDelaySec = (now - resultEnd) / 1000.0;
     }
 
-    perf.jobId = getJobId(result);
     // As soon as available, try to capture cumulative cost at this point too.
 
     NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
@@ -429,105 +425,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
     return perf;
   }
 
-  private String getJobId(PipelineResult job) {
-    return "";
-  }
-
-  // TODO specific to dataflow, see if we can find an equivalent
-/*
-  protected MetricType getMetricType(MetricUpdate metric) {
-    String metricName = metric.getKey().metricName().name();
-    if (metricName.endsWith("windmill-system-watermark")) {
-      return MetricType.SYSTEM_WATERMARK;
-    } else if (metricName.endsWith("windmill-data-watermark")) {
-      return MetricType.DATA_WATERMARK;
-    } else {
-      return MetricType.OTHER;
-    }
-  }
-*/
-
-  /**
-   * Check that watermarks are not too far behind.
-   *
-   * <p>Returns a list of errors detected.
-   */
-  // TODO specific to dataflow, see if we can find an equivalent
-  /*
-  private List<String> checkWatermarks(DataflowPipelineJob job, long startMsSinceEpoch) {
-    long now = System.currentTimeMillis();
-    List<String> errors = new ArrayList<>();
-    try {
-      JobMetrics metricResponse = job.getDataflowClient()
-                                     .projects()
-                                     .jobs()
-                                     .getMetrics(job.getProjectId(), job.getJobId())
-                                     .execute();
-          List<MetricUpdate> metrics = metricResponse.getMetrics();
-
-
-
-      if (metrics != null) {
-        boolean foundWatermarks = false;
-        for (MetricUpdate metric : metrics) {
-          MetricType type = getMetricType(metric);
-          if (type == MetricType.OTHER) {
-            continue;
-          }
-          foundWatermarks = true;
-          @SuppressWarnings("unchecked")
-          BigDecimal scalar = (BigDecimal) metric.getScalar();
-          if (scalar.signum() < 0) {
-            continue;
-          }
-          Instant value =
-                  new Instant(scalar.divideToIntegralValue(new BigDecimal(1000)).longValueExact());
-          Instant updateTime = Instant.parse(metric.getUpdateTime());
-
-          if (options.getWatermarkValidationDelaySeconds() == null
-                  || now > startMsSinceEpoch
-                  + Duration.standardSeconds(options.getWatermarkValidationDelaySeconds())
-                  .getMillis()) {
-            Duration threshold = null;
-            if (type == MetricType.SYSTEM_WATERMARK && options.getMaxSystemLagSeconds() != null) {
-              threshold = Duration.standardSeconds(options.getMaxSystemLagSeconds());
-            } else if (type == MetricType.DATA_WATERMARK
-                    && options.getMaxDataLagSeconds() != null) {
-              threshold = Duration.standardSeconds(options.getMaxDataLagSeconds());
-            }
-
-            if (threshold != null && value.isBefore(updateTime.minus(threshold))) {
-              String msg = String.format("High lag for %s: %s vs %s (allowed lag of %s)",
-                      metric.getKey().metricName().name(), value, updateTime, threshold);
-              errors.add(msg);
-              NexmarkUtils.console(msg);
-            }
-          }
-        }
-        if (!foundWatermarks) {
-          NexmarkUtils.console("No known watermarks in update: " + metrics);
-          if (now > startMsSinceEpoch + Duration.standardMinutes(5).getMillis()) {
-            errors.add("No known watermarks found.  Metrics were " + metrics);
-          }
-        }
-      }
-    } catch (IOException e) {
-      NexmarkUtils.console("Warning: failed to get JobMetrics: " + e);
-    }
-
-    return errors;
-  }
-*/
-
-  // TODO specific to dataflow, see if we can find an equivalent
-/*
-  enum MetricType {
-    SYSTEM_WATERMARK,
-    DATA_WATERMARK,
-    OTHER
-  }
-*/
-
   /**
    * Build and run a pipeline using specified options.
    */
@@ -643,9 +540,6 @@ public class NexmarkRunner<OptionT extends NexmarkOptions> {
               String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes()));
         }
 
-        // TODO specific to dataflow, see if we can find an equivalent
-//        errors.addAll(checkWatermarks(job, startMsSinceEpoch));
-
         if (waitingForShutdown) {
           try {
             job.cancel();