You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/05/31 16:04:24 UTC
reef git commit: [REEF-1396] Fix testFailureRestart to validate that
the restarted Evaluators are received
Repository: reef
Updated Branches:
refs/heads/master 4c0207ed0 -> bf5caab94
[REEF-1396] Fix testFailureRestart to validate that the restarted Evaluators are received
This addressed the issue by
* Adding a count for closed Evaluators and verifying it at the end of the tests.
* Adding a single Evaluator failure and single Evaluator restart case.
* Lower number of test Evaluators.
JIRA:
[REEF-1396](https://issues.apache.org/jira/browse/REEF-1396)
Pull Request
This closes #1013
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bf5caab9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bf5caab9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bf5caab9
Branch: refs/heads/master
Commit: bf5caab94bf4614658b3e4071034c3ecae916660
Parents: 4c0207e
Author: Andrew Chung <af...@gmail.com>
Authored: Tue May 24 11:29:13 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue May 31 09:02:53 2016 -0700
----------------------------------------------------------------------
.../tests/evaluator/failure/FailureDriver.java | 147 +++++++++++++++++++
.../tests/evaluator/failure/FailureREEF.java | 142 ++++++++++++++++++
.../tests/evaluator/failure/package-info.java | 22 +++
.../failure/parameters/NumEvaluatorsToFail.java | 32 ++++
.../parameters/NumEvaluatorsToSubmit.java | 32 ++++
.../failure/parameters/package-info.java | 22 +++
.../reef/tests/yarn/failure/FailureDriver.java | 112 --------------
.../reef/tests/yarn/failure/FailureREEF.java | 134 -----------------
.../reef/tests/yarn/failure/package-info.java | 22 ---
.../java/org/apache/reef/tests/FailureTest.java | 18 ++-
10 files changed, 412 insertions(+), 271 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java
new file mode 100644
index 0000000..65b4913
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluator.failure;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.poison.PoisonedConfiguration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToFail;
+import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToSubmit;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver for failure test.
+ */
+@Unit
+public class FailureDriver {
+
+ private final int numEvaluatorsToSubmit;
+ private final int numEvaluatorsToFail;
+ private final AtomicInteger numEvaluatorsLeftToSubmit;
+ private final AtomicInteger numEvaluatorsLeftToClose;
+ private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName());
+ private final EvaluatorRequestor requestor;
+
+ @Inject
+ public FailureDriver(@Parameter(NumEvaluatorsToSubmit.class) final int numEvaluatorsToSubmit,
+ @Parameter(NumEvaluatorsToFail.class) final int numEvaluatorsToFail,
+ final EvaluatorRequestor requestor) {
+ Validate.isTrue(numEvaluatorsToSubmit > 0, "The number of Evaluators to submit must be greater than 0.");
+ Validate.inclusiveBetween(1, numEvaluatorsToSubmit, numEvaluatorsToFail,
+ "The number of Evaluators to fail must be between 1 and numEvaluatorsToSubmit, inclusive.");
+
+ this.numEvaluatorsToSubmit = numEvaluatorsToSubmit;
+ this.numEvaluatorsToFail = numEvaluatorsToFail;
+
+ this.numEvaluatorsLeftToSubmit = new AtomicInteger(numEvaluatorsToSubmit);
+
+ // We should close numEvaluatorsToSubmit because all failed Evaluators are eventually resubmitted and closed.
+ this.numEvaluatorsLeftToClose = new AtomicInteger(numEvaluatorsToSubmit);
+
+ this.requestor = requestor;
+ LOG.info("Driver instantiated");
+ }
+
+ /**
+ * Handles the StartTime event: Request as single Evaluator.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ LOG.log(Level.FINE, "Request {0} Evaluators.", numEvaluatorsToSubmit);
+ FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+ .setNumber(numEvaluatorsToSubmit)
+ .setMemory(64)
+ .setNumberOfCores(1)
+ .build());
+ }
+ }
+
+ /**
+ * Handles AllocatedEvaluator: Submit a poisoned context.
+ */
+ final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ final String evalId = allocatedEvaluator.getId();
+ LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId);
+ if (numEvaluatorsLeftToSubmit.getAndDecrement() > 0) {
+ LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", numEvaluatorsLeftToSubmit);
+ allocatedEvaluator.submitContext(
+ Tang.Factory.getTang()
+ .newConfigurationBuilder(
+ ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId)
+ .build(),
+ PoisonedConfiguration.CONTEXT_CONF
+ .set(PoisonedConfiguration.CRASH_PROBABILITY, "1")
+ .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
+ .build())
+ .build());
+ } else {
+ LOG.log(Level.FINE, "Closing evaluator {0}", evalId);
+ allocatedEvaluator.close();
+ FailureDriver.this.numEvaluatorsLeftToClose.decrementAndGet();
+ }
+ }
+ }
+
+ /**
+ * Handles FailedEvaluator: Resubmits the single Evaluator resource request.
+ */
+ final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+ @Override
+ public void onNext(final FailedEvaluator failedEvaluator) {
+ LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId());
+ FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+ .setNumber(1)
+ .setMemory(64)
+ .setNumberOfCores(1)
+ .build());
+ }
+ }
+
+ /**
+ * Checks whether all failed Evaluators were properly resubmitted and restarted.
+ */
+ final class StopHandler implements EventHandler<StopTime> {
+ @Override
+ public void onNext(final StopTime stopTime) {
+ final int numEvaluatorsToClose = FailureDriver.this.numEvaluatorsLeftToClose.get();
+ if (numEvaluatorsToClose != 0){
+ final String message = "Got RuntimeStop Event. Expected to close " + numEvaluatorsToSubmit + " Evaluators " +
+ "but only " + (numEvaluatorsToSubmit - numEvaluatorsToClose) + " Evaluators were closed.";
+ LOG.log(Level.SEVERE, message);
+ throw new DriverSideFailure(message);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java
new file mode 100644
index 0000000..c9828b4
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluator.failure;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToFail;
+import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToSubmit;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Entry point class for REEF failure test.
+ */
+public final class FailureREEF {
+ /**
+ * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently.
+ */
+ public static final int MAX_NUMBER_OF_EVALUATORS = 16;
+
+ private static final Logger LOG = Logger.getLogger(FailureREEF.class.getName());
+
+ private static Configuration parseCommandLine(final String[] aArgs) {
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ try {
+ new CommandLine(cb)
+ .registerShortNameOfClass(Local.class)
+ .registerShortNameOfClass(TimeOut.class)
+ .processCommandLine(aArgs);
+ return cb.build();
+ } catch (final BindException | IOException ex) {
+ final String msg = "Unable to parse command line";
+ LOG.log(Level.SEVERE, msg, ex);
+ throw new RuntimeException(msg, ex);
+ }
+ }
+
+ /**
+ * @return (immutable) TANG Configuration object.
+ * @throws BindException if configuration injector fails.
+ * @throws InjectionException if the Local.class parameter is not injected.
+ */
+ private static Configuration getRunTimeConfiguration(final boolean isLocal) throws BindException {
+
+ final Configuration runtimeConfiguration;
+
+ if (isLocal) {
+ LOG.log(Level.INFO, "Running Failure demo on the local runtime");
+ runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS)
+ .build();
+ } else {
+ LOG.log(Level.INFO, "Running Failure demo on YARN");
+ runtimeConfiguration = YarnClientConfiguration.CONF.build();
+ }
+
+ return runtimeConfiguration;
+ }
+
+ public static LauncherStatus runFailureReef(
+ final Configuration runtimeConfig, final int timeout, final int numEvaluatorsToSubmit,
+ final int numEvaluatorsToFail) throws InjectionException {
+
+ final Configuration driverConf = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailureDriver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "FailureREEF")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, FailureDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, FailureDriver.EvaluatorAllocatedHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_FAILED, FailureDriver.EvaluatorFailedHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STOP, FailureDriver.StopHandler.class)
+ .build();
+
+ final Configuration namedParamsConf = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(NumEvaluatorsToSubmit.class, Integer.toString(numEvaluatorsToSubmit))
+ .bindNamedParameter(NumEvaluatorsToFail.class, Integer.toString(numEvaluatorsToFail))
+ .build();
+
+ final LauncherStatus state = DriverLauncher.getLauncher(runtimeConfig)
+ .run(Configurations.merge(driverConf, namedParamsConf), timeout);
+
+ LOG.log(Level.INFO, "REEF job completed: {0}", state);
+ return state;
+ }
+
+ public static void main(final String[] args) throws InjectionException {
+ final Configuration commandLineConf = parseCommandLine(args);
+ final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+ final boolean isLocal = injector.getNamedInstance(Local.class);
+ final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
+ runFailureReef(getRunTimeConfiguration(isLocal), jobTimeout, 40, 10);
+ }
+
+ /**
+ * Empty private constructor to prohibit instantiation of utility class.
+ */
+ private FailureREEF() {
+ }
+
+ /**
+ * Command line parameter = true to run locally, or false to run on YARN.
+ */
+ @NamedParameter(doc = "Whether or not to run on the local runtime",
+ short_name = "local", default_value = "true")
+ public static final class Local implements Name<Boolean> {
+ }
+
+ /**
+ * Number of minutes before timeout.
+ */
+ @NamedParameter(doc = "Number of minutes before timeout",
+ short_name = "timeout", default_value = "2")
+ public static final class TimeOut implements Name<Integer> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java
new file mode 100644
index 0000000..ca60a12
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for Evaluator failures.
+ */
+package org.apache.reef.tests.evaluator.failure;
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java
new file mode 100644
index 0000000..c5123ad
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluator.failure.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tests.evaluator.failure.FailureDriver;
+
+/**
+ * The number of Evaluators to fail and resubmit in {@link FailureDriver}.
+ */
+@NamedParameter(doc = "The number of Evaluators to fail and resubmit in FailureDriver.")
+public final class NumEvaluatorsToFail implements Name<Integer> {
+ private NumEvaluatorsToFail() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java
new file mode 100644
index 0000000..7572f49
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluator.failure.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tests.evaluator.failure.FailureDriver;
+
+/**
+ * The number of Evaluators to submit in {@link FailureDriver}.
+ */
+@NamedParameter(doc = "The number of Evaluators to submit in FailureDriver.")
+public final class NumEvaluatorsToSubmit implements Name<Integer> {
+ private NumEvaluatorsToSubmit() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java
new file mode 100644
index 0000000..72a5101
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Parameters for {@link org.apache.reef.tests.evaluator.failure.FailureDriver}.
+ */
+package org.apache.reef.tests.evaluator.failure.parameters;
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
deleted file mode 100644
index 037b2c2..0000000
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.tests.yarn.failure;
-
-import org.apache.reef.driver.context.ContextConfiguration;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorRequest;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.poison.PoisonedConfiguration;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
-
-import javax.inject.Inject;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Driver for failure test.
- */
-@Unit
-public class FailureDriver {
-
- private static final int NUM_EVALUATORS = 40;
- private static final int NUM_FAILURES = 10;
- private final AtomicInteger toSubmit = new AtomicInteger(NUM_FAILURES);
- private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName());
- private final EvaluatorRequestor requestor;
-
- @Inject
- public FailureDriver(final EvaluatorRequestor requestor) {
- this.requestor = requestor;
- LOG.info("Driver instantiated");
- }
-
- /**
- * Handles the StartTime event: Request as single Evaluator.
- */
- final class StartHandler implements EventHandler<StartTime> {
- @Override
- public void onNext(final StartTime startTime) {
- LOG.log(Level.FINE, "Request {0} Evaluators.", NUM_EVALUATORS);
- FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
- .setNumber(NUM_EVALUATORS)
- .setMemory(64)
- .setNumberOfCores(1)
- .build());
- }
- }
-
- /**
- * Handles AllocatedEvaluator: Submit a poisoned context.
- */
- final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
- @Override
- public void onNext(final AllocatedEvaluator allocatedEvaluator) {
- final String evalId = allocatedEvaluator.getId();
- LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId);
- if (toSubmit.getAndDecrement() > 0) {
- LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", toSubmit);
- allocatedEvaluator.submitContext(
- Tang.Factory.getTang()
- .newConfigurationBuilder(
- ContextConfiguration.CONF
- .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId)
- .build(),
- PoisonedConfiguration.CONTEXT_CONF
- .set(PoisonedConfiguration.CRASH_PROBABILITY, "1")
- .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
- .build())
- .build());
- } else {
- LOG.log(Level.FINE, "Closing evaluator {0}", evalId);
- allocatedEvaluator.close();
- }
- }
- }
-
- /**
- * Handles FailedEvaluator: Resubmits the single Evaluator resource request.
- */
- final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
- @Override
- public void onNext(final FailedEvaluator failedEvaluator) {
- LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId());
- FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
- .setNumber(1)
- .setMemory(64)
- .setNumberOfCores(1)
- .build());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java
deleted file mode 100644
index d056dad..0000000
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.tests.yarn.failure;
-
-import org.apache.reef.client.DriverConfiguration;
-import org.apache.reef.client.DriverLauncher;
-import org.apache.reef.client.LauncherStatus;
-import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
-import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.JavaConfigurationBuilder;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.CommandLine;
-import org.apache.reef.util.EnvironmentUtils;
-
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Entry point class for REEF failure test.
- */
-public final class FailureREEF {
- /**
- * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently.
- */
- public static final int MAX_NUMBER_OF_EVALUATORS = 16;
-
- private static final Logger LOG = Logger.getLogger(FailureREEF.class.getName());
-
- private static Configuration parseCommandLine(final String[] aArgs) {
- final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
- try {
- new CommandLine(cb)
- .registerShortNameOfClass(Local.class)
- .registerShortNameOfClass(TimeOut.class)
- .processCommandLine(aArgs);
- return cb.build();
- } catch (final BindException | IOException ex) {
- final String msg = "Unable to parse command line";
- LOG.log(Level.SEVERE, msg, ex);
- throw new RuntimeException(msg, ex);
- }
- }
-
- /**
- * @return (immutable) TANG Configuration object.
- * @throws BindException if configuration injector fails.
- * @throws InjectionException if the Local.class parameter is not injected.
- */
- private static Configuration getRunTimeConfiguration(final boolean isLocal) throws BindException {
-
- final Configuration runtimeConfiguration;
-
- if (isLocal) {
- LOG.log(Level.INFO, "Running Failure demo on the local runtime");
- runtimeConfiguration = LocalRuntimeConfiguration.CONF
- .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS)
- .build();
- } else {
- LOG.log(Level.INFO, "Running Failure demo on YARN");
- runtimeConfiguration = YarnClientConfiguration.CONF.build();
- }
-
- return runtimeConfiguration;
- }
-
- public static LauncherStatus runFailureReef(
- final Configuration runtimeConfig, final int timeout) throws InjectionException {
-
- final Configuration driverConf = DriverConfiguration.CONF
- .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailureDriver.class))
- .set(DriverConfiguration.DRIVER_IDENTIFIER, "FailureREEF")
- .set(DriverConfiguration.ON_DRIVER_STARTED, FailureDriver.StartHandler.class)
- .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, FailureDriver.EvaluatorAllocatedHandler.class)
- .set(DriverConfiguration.ON_EVALUATOR_FAILED, FailureDriver.EvaluatorFailedHandler.class)
- .build();
-
- final LauncherStatus state = DriverLauncher.getLauncher(runtimeConfig).run(driverConf, timeout);
- LOG.log(Level.INFO, "REEF job completed: {0}", state);
- return state;
- }
-
- public static void main(final String[] args) throws InjectionException {
- final Configuration commandLineConf = parseCommandLine(args);
- final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
- final boolean isLocal = injector.getNamedInstance(Local.class);
- final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
- runFailureReef(getRunTimeConfiguration(isLocal), jobTimeout);
- }
-
- /**
- * Empty private constructor to prohibit instantiation of utility class.
- */
- private FailureREEF() {
- }
-
- /**
- * Command line parameter = true to run locally, or false to run on YARN.
- */
- @NamedParameter(doc = "Whether or not to run on the local runtime",
- short_name = "local", default_value = "true")
- public static final class Local implements Name<Boolean> {
- }
-
- /**
- * Number of minutes before timeout.
- */
- @NamedParameter(doc = "Number of minutes before timeout",
- short_name = "timeout", default_value = "2")
- public static final class TimeOut implements Name<Integer> {
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java
deleted file mode 100644
index 816f866..0000000
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Tests for YARN failures.
- */
-package org.apache.reef.tests.yarn.failure;
http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java
index a25a1bf..a2962c4 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java
@@ -21,7 +21,7 @@ package org.apache.reef.tests;
import org.apache.reef.client.LauncherStatus;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tests.yarn.failure.FailureREEF;
+import org.apache.reef.tests.evaluator.failure.FailureREEF;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -45,12 +45,24 @@ public class FailureTest {
}
@Test
+ public void testSingleEvaluatorFailureAndRestart() throws InjectionException {
+ runTestFailureReefWithParams(1, 1, "testSingleEvaluatorFailureAndRestart");
+ }
+
+ @Test
public void testFailureRestart() throws InjectionException {
+ runTestFailureReefWithParams(30, 5, "testFailureRestart");
+ }
+
+ private void runTestFailureReefWithParams(final int numEvaluatorsToSubmit,
+ final int numEvaluatorsTofail,
+ final String testName) throws InjectionException {
final Configuration runtimeConfiguration = this.testEnvironment.getRuntimeConfiguration();
final LauncherStatus status =
- FailureREEF.runFailureReef(runtimeConfiguration, this.testEnvironment.getTestTimeout());
+ FailureREEF.runFailureReef(runtimeConfiguration, this.testEnvironment.getTestTimeout(),
+ numEvaluatorsToSubmit, numEvaluatorsTofail);
- Assert.assertTrue("FailureReef failed: " + status, status.isSuccess());
+ Assert.assertTrue("FailureReef " + testName + " failed: " + status, status.isSuccess());
}
}