You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/01/17 17:26:47 UTC
[beam] branch master updated: [BEAM-6334] Add throwing exception in
case of invalid state or timeout
This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d116214 [BEAM-6334] Add throwing exception in case of invalid state or timeout
d116214 is described below
commit d1162143f79a80b0e4b3aacd67fa227036b34322
Author: Ćukasz Gajowy <lu...@gmail.com>
AuthorDate: Thu Jan 17 18:26:36 2019 +0100
[BEAM-6334] Add throwing exception in case of invalid state or timeout
---
.../org/apache/beam/sdk/loadtests/JobFailure.java | 93 ++++++++++++++++++++++
.../org/apache/beam/sdk/loadtests/LoadTest.java | 15 ++--
.../apache/beam/sdk/loadtests/LoadTestOptions.java | 6 ++
3 files changed, 109 insertions(+), 5 deletions(-)
diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/JobFailure.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/JobFailure.java
new file mode 100644
index 0000000..aa498ea
--- /dev/null
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/JobFailure.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.loadtests;
+
+import static java.lang.String.format;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.beam.sdk.PipelineResult;
+import org.joda.time.Duration;
+
+/** Class for detecting failures after {@link PipelineResult#waitUntilFinish(Duration)} unblocks. */
+class JobFailure {
+
+ private String cause;
+
+ private boolean requiresCancelling;
+
+ JobFailure(String cause, boolean requiresCancelling) {
+ this.cause = cause;
+ this.requiresCancelling = requiresCancelling;
+ }
+
+ static void handleFailure(final PipelineResult pipelineResult, final LoadTestResult testResult)
+ throws IOException {
+ Optional<JobFailure> failure = lookForFailure(pipelineResult, testResult);
+
+ if (failure.isPresent()) {
+ JobFailure jobFailure = failure.get();
+
+ if (jobFailure.requiresCancelling) {
+ pipelineResult.cancel();
+ }
+
+ throw new RuntimeException(jobFailure.cause);
+ }
+ }
+
+ private static Optional<JobFailure> lookForFailure(
+ PipelineResult pipelineResult, LoadTestResult testResult) {
+ PipelineResult.State state = pipelineResult.getState();
+
+ Optional<JobFailure> stateRelatedFailure = lookForInvalidState(state);
+
+ if (stateRelatedFailure.isPresent()) {
+ return stateRelatedFailure;
+ } else {
+ return lookForMetricResultFailure(testResult);
+ }
+ }
+
+ private static Optional<JobFailure> lookForMetricResultFailure(LoadTestResult testResult) {
+ if (testResult.getRuntime() == -1 || testResult.getTotalBytesCount() == -1) {
+ return of(new JobFailure("Invalid test results", false));
+ } else {
+ return empty();
+ }
+ }
+
+ private static Optional<JobFailure> lookForInvalidState(PipelineResult.State state) {
+ switch (state) {
+ case RUNNING:
+ case UNKNOWN:
+ return of(new JobFailure("Job timeout.", true));
+
+ case CANCELLED:
+ case FAILED:
+ case STOPPED:
+ case UPDATED:
+ return of(new JobFailure(format("Invalid job state: %s.", state.toString()), false));
+
+ default:
+ return empty();
+ }
+ }
+}
diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
index 61cb3a2..2f783d2 100644
--- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.loadtests;
import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.apache.beam.sdk.loadtests.JobFailure.handleFailure;
import java.io.IOException;
import java.util.Optional;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
/**
* Base class for all load tests. Provides common operations such as initializing source/step
@@ -85,17 +87,20 @@ abstract class LoadTest<OptionsT extends LoadTestOptions> {
loadTest();
- PipelineResult result = pipeline.run();
- result.waitUntilFinish();
-
- LoadTestResult testResult = LoadTestResult.create(result, metricsNamespace, testStartTime);
+ PipelineResult pipelineResult = pipeline.run();
+ pipelineResult.waitUntilFinish(Duration.standardMinutes(options.getLoadTestTimeout()));
+ LoadTestResult testResult =
+ LoadTestResult.create(pipelineResult, metricsNamespace, testStartTime);
ConsoleResultPublisher.publish(testResult);
+ handleFailure(pipelineResult, testResult);
+
if (options.getPublishToBigQuery()) {
publishResultToBigQuery(testResult);
}
- return result;
+
+ return pipelineResult;
}
private void publishResultToBigQuery(LoadTestResult testResult) {
diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
index 3e56d9a..e69b071 100644
--- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
+++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/LoadTestOptions.java
@@ -55,6 +55,12 @@ public interface LoadTestOptions extends PipelineOptions, ApplicationNameOptions
void setBigQueryTable(String tableName);
+ @Description("Timeout for a load test expressed in minutes")
+ @Default.Integer(240)
+ Integer getLoadTestTimeout();
+
+ void setLoadTestTimeout(Integer timeout);
+
static <T extends LoadTestOptions> T readFromArgs(String[] args, Class<T> optionsClass) {
return PipelineOptionsFactory.fromArgs(args).withValidation().as(optionsClass);
}