You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/01/17 17:26:47 UTC

[beam] branch master updated: [BEAM-6334] Add throwing exception in case of invalid state or timeout

This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d116214  [BEAM-6334] Add throwing exception in case of invalid state or timeout
d116214 is described below

commit d1162143f79a80b0e4b3aacd67fa227036b34322
Author: Ɓukasz Gajowy <lu...@gmail.com>
AuthorDate: Thu Jan 17 18:26:36 2019 +0100

    [BEAM-6334] Add throwing exception in case of invalid state or timeout
---
 .../org/apache/beam/sdk/loadtests/JobFailure.java  | 93 ++++++++++++++++++++++
 .../org/apache/beam/sdk/loadtests/LoadTest.java    | 15 ++--
 .../apache/beam/sdk/loadtests/LoadTestOptions.java |  6 ++
 3 files changed, 109 insertions(+), 5 deletions(-)

diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/JobFailure.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/JobFailure.java
new file mode 100644
index 0000000..aa498ea
--- /dev/null
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/JobFailure.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.loadtests;
+
+import static java.lang.String.format;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.beam.sdk.PipelineResult;
+import org.joda.time.Duration;
+
+/** Class for detecting failures after {@link PipelineResult#waitUntilFinish(Duration)} unblocks. */
+class JobFailure {
+
+  private String cause;
+
+  private boolean requiresCancelling;
+
+  JobFailure(String cause, boolean requiresCancelling) {
+    this.cause = cause;
+    this.requiresCancelling = requiresCancelling;
+  }
+
+  static void handleFailure(final PipelineResult pipelineResult, final LoadTestResult testResult)
+      throws IOException {
+    Optional<JobFailure> failure = lookForFailure(pipelineResult, testResult);
+
+    if (failure.isPresent()) {
+      JobFailure jobFailure = failure.get();
+
+      if (jobFailure.requiresCancelling) {
+        pipelineResult.cancel();
+      }
+
+      throw new RuntimeException(jobFailure.cause);
+    }
+  }
+
+  private static Optional<JobFailure> lookForFailure(
+      PipelineResult pipelineResult, LoadTestResult testResult) {
+    PipelineResult.State state = pipelineResult.getState();
+
+    Optional<JobFailure> stateRelatedFailure = lookForInvalidState(state);
+
+    if (stateRelatedFailure.isPresent()) {
+      return stateRelatedFailure;
+    } else {
+      return lookForMetricResultFailure(testResult);
+    }
+  }
+
+  private static Optional<JobFailure> lookForMetricResultFailure(LoadTestResult testResult) {
+    if (testResult.getRuntime() == -1 || testResult.getTotalBytesCount() == -1) {
+      return of(new JobFailure("Invalid test results", false));
+    } else {
+      return empty();
+    }
+  }
+
+  private static Optional<JobFailure> lookForInvalidState(PipelineResult.State state) {
+    switch (state) {
+      case RUNNING:
+      case UNKNOWN:
+        return of(new JobFailure("Job timeout.", true));
+
+      case CANCELLED:
+      case FAILED:
+      case STOPPED:
+      case UPDATED:
+        return of(new JobFailure(format("Invalid job state: %s.", state.toString()), false));
+
+      default:
+        return empty();
+    }
+  }
+}
diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
index 61cb3a2..2f783d2 100644
--- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.loadtests;
 
 import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.apache.beam.sdk.loadtests.JobFailure.handleFailure;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
 
 /**
  * Base class for all load tests. Provides common operations such as initializing source/step
@@ -85,17 +87,20 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
 
     loadTest();
 
-    PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
-
-    LoadTestResult testResult = LoadTestResult.create(result, metricsNamespace, testStartTime);
+    PipelineResult pipelineResult = pipeline.run();
+    pipelineResult.waitUntilFinish(Duration.standardMinutes(options.getLoadTestTimeout()));
 
+    LoadTestResult testResult =
+        LoadTestResult.create(pipelineResult, metricsNamespace, testStartTime);
     ConsoleResultPublisher.publish(testResult);
 
+    handleFailure(pipelineResult, testResult);
+
     if (options.getPublishToBigQuery()) {
       publishResultToBigQuery(testResult);
     }
-    return result;
+
+    return pipelineResult;
   }
 
   private void publishResultToBigQuery(LoadTestResult testResult) {
diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
index 3e56d9a..e69b071 100644
--- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
@@ -55,6 +55,12 @@ public interface LoadTestOptions extends PipelineOptions, ApplicationNameOptions
 
   void setBigQueryTable(String tableName);
 
+  @Description("Timeout for a load test expressed in minutes")
+  @Default.Integer(240)
+  Integer getLoadTestTimeout();
+
+  void setLoadTestTimeout(Integer timeout);
+
   static <T extends LoadTestOptions> T readFromArgs(String[] args, Class<T> optionsClass) {
     return PipelineOptionsFactory.fromArgs(args).withValidation().as(optionsClass);
   }