You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2016/05/31 16:04:24 UTC

reef git commit: [REEF-1396] Fix testFailureRestart to validate that the restarted Evaluators are received

Repository: reef
Updated Branches:
  refs/heads/master 4c0207ed0 -> bf5caab94


[REEF-1396] Fix testFailureRestart to validate that the restarted Evaluators are received

This addressed the issue by
  * Adding a count for closed Evaluators and verifying it at the end of the tests.
  * Adding a single Evaluator failure and single Evaluator restart case.
  * Lower number of test Evaluators.

JIRA:
  [REEF-1396](https://issues.apache.org/jira/browse/REEF-1396)

Pull Request
  This closes #1013


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bf5caab9
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bf5caab9
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bf5caab9

Branch: refs/heads/master
Commit: bf5caab94bf4614658b3e4071034c3ecae916660
Parents: 4c0207e
Author: Andrew Chung <af...@gmail.com>
Authored: Tue May 24 11:29:13 2016 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue May 31 09:02:53 2016 -0700

----------------------------------------------------------------------
 .../tests/evaluator/failure/FailureDriver.java  | 147 +++++++++++++++++++
 .../tests/evaluator/failure/FailureREEF.java    | 142 ++++++++++++++++++
 .../tests/evaluator/failure/package-info.java   |  22 +++
 .../failure/parameters/NumEvaluatorsToFail.java |  32 ++++
 .../parameters/NumEvaluatorsToSubmit.java       |  32 ++++
 .../failure/parameters/package-info.java        |  22 +++
 .../reef/tests/yarn/failure/FailureDriver.java  | 112 --------------
 .../reef/tests/yarn/failure/FailureREEF.java    | 134 -----------------
 .../reef/tests/yarn/failure/package-info.java   |  22 ---
 .../java/org/apache/reef/tests/FailureTest.java |  18 ++-
 10 files changed, 412 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java
new file mode 100644
index 0000000..65b4913
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureDriver.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluator.failure;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.poison.PoisonedConfiguration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToFail;
+import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToSubmit;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver for failure test.
+ */
+@Unit
+public class FailureDriver {
+
+  private final int numEvaluatorsToSubmit;
+  private final int numEvaluatorsToFail;
+  private final AtomicInteger numEvaluatorsLeftToSubmit;
+  private final AtomicInteger numEvaluatorsLeftToClose;
+  private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName());
+  private final EvaluatorRequestor requestor;
+
+  @Inject
+  public FailureDriver(@Parameter(NumEvaluatorsToSubmit.class) final int numEvaluatorsToSubmit,
+                       @Parameter(NumEvaluatorsToFail.class) final int numEvaluatorsToFail,
+                       final EvaluatorRequestor requestor) {
+    Validate.isTrue(numEvaluatorsToSubmit > 0, "The number of Evaluators to submit must be greater than 0.");
+    Validate.inclusiveBetween(1, numEvaluatorsToSubmit, numEvaluatorsToFail,
+        "The number of Evaluators to fail must be between 1 and numEvaluatorsToSubmit, inclusive.");
+
+    this.numEvaluatorsToSubmit = numEvaluatorsToSubmit;
+    this.numEvaluatorsToFail = numEvaluatorsToFail;
+
+    this.numEvaluatorsLeftToSubmit = new AtomicInteger(numEvaluatorsToSubmit);
+
+    // We should close numEvaluatorsToSubmit because all failed Evaluators are eventually resubmitted and closed.
+    this.numEvaluatorsLeftToClose = new AtomicInteger(numEvaluatorsToSubmit);
+
+    this.requestor = requestor;
+    LOG.info("Driver instantiated");
+  }
+
+  /**
+   * Handles the StartTime event: Request as single Evaluator.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      LOG.log(Level.FINE, "Request {0} Evaluators.", numEvaluatorsToSubmit);
+      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(numEvaluatorsToSubmit)
+          .setMemory(64)
+          .setNumberOfCores(1)
+          .build());
+    }
+  }
+
+  /**
+   * Handles AllocatedEvaluator: Submit a poisoned context.
+   */
+  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      final String evalId = allocatedEvaluator.getId();
+      LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId);
+      if (numEvaluatorsLeftToSubmit.getAndDecrement() > 0) {
+        LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", numEvaluatorsLeftToSubmit);
+        allocatedEvaluator.submitContext(
+            Tang.Factory.getTang()
+                .newConfigurationBuilder(
+                    ContextConfiguration.CONF
+                        .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId)
+                        .build(),
+                    PoisonedConfiguration.CONTEXT_CONF
+                        .set(PoisonedConfiguration.CRASH_PROBABILITY, "1")
+                        .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
+                        .build())
+                .build());
+      } else {
+        LOG.log(Level.FINE, "Closing evaluator {0}", evalId);
+        allocatedEvaluator.close();
+        FailureDriver.this.numEvaluatorsLeftToClose.decrementAndGet();
+      }
+    }
+  }
+
+  /**
+   * Handles FailedEvaluator: Resubmits the single Evaluator resource request.
+   */
+  final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+    @Override
+    public void onNext(final FailedEvaluator failedEvaluator) {
+      LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId());
+      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(1)
+          .setMemory(64)
+          .setNumberOfCores(1)
+          .build());
+    }
+  }
+
+  /**
+   * Checks whether all failed Evaluators were properly resubmitted and restarted.
+   */
+  final class StopHandler implements EventHandler<StopTime> {
+    @Override
+    public void onNext(final StopTime stopTime) {
+      final int numEvaluatorsToClose = FailureDriver.this.numEvaluatorsLeftToClose.get();
+      if (numEvaluatorsToClose != 0){
+        final String message = "Got RuntimeStop Event. Expected to close " + numEvaluatorsToSubmit + " Evaluators " +
+            "but only " + (numEvaluatorsToSubmit - numEvaluatorsToClose) + " Evaluators were closed.";
+        LOG.log(Level.SEVERE, message);
+        throw new DriverSideFailure(message);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java
new file mode 100644
index 0000000..c9828b4
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/FailureREEF.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluator.failure;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.DriverLauncher;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToFail;
+import org.apache.reef.tests.evaluator.failure.parameters.NumEvaluatorsToSubmit;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Entry point class for REEF failure test.
+ */
+public final class FailureREEF {
+  /**
+   * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently.
+   */
+  public static final int MAX_NUMBER_OF_EVALUATORS = 16;
+
+  private static final Logger LOG = Logger.getLogger(FailureREEF.class.getName());
+
+  private static Configuration parseCommandLine(final String[] aArgs) {
+    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+    try {
+      new CommandLine(cb)
+          .registerShortNameOfClass(Local.class)
+          .registerShortNameOfClass(TimeOut.class)
+          .processCommandLine(aArgs);
+      return cb.build();
+    } catch (final BindException | IOException ex) {
+      final String msg = "Unable to parse command line";
+      LOG.log(Level.SEVERE, msg, ex);
+      throw new RuntimeException(msg, ex);
+    }
+  }
+
+  /**
+   * @return (immutable) TANG Configuration object.
+   * @throws BindException      if configuration injector fails.
+   * @throws InjectionException if the Local.class parameter is not injected.
+   */
+  private static Configuration getRunTimeConfiguration(final boolean isLocal) throws BindException {
+
+    final Configuration runtimeConfiguration;
+
+    if (isLocal) {
+      LOG.log(Level.INFO, "Running Failure demo on the local runtime");
+      runtimeConfiguration = LocalRuntimeConfiguration.CONF
+          .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS)
+          .build();
+    } else {
+      LOG.log(Level.INFO, "Running Failure demo on YARN");
+      runtimeConfiguration = YarnClientConfiguration.CONF.build();
+    }
+
+    return runtimeConfiguration;
+  }
+
+  public static LauncherStatus runFailureReef(
+      final Configuration runtimeConfig, final int timeout, final int numEvaluatorsToSubmit,
+      final int numEvaluatorsToFail) throws InjectionException {
+
+    final Configuration driverConf = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailureDriver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "FailureREEF")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, FailureDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, FailureDriver.EvaluatorAllocatedHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_FAILED, FailureDriver.EvaluatorFailedHandler.class)
+        .set(DriverConfiguration.ON_DRIVER_STOP, FailureDriver.StopHandler.class)
+        .build();
+
+    final Configuration namedParamsConf = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(NumEvaluatorsToSubmit.class, Integer.toString(numEvaluatorsToSubmit))
+        .bindNamedParameter(NumEvaluatorsToFail.class, Integer.toString(numEvaluatorsToFail))
+        .build();
+
+    final LauncherStatus state = DriverLauncher.getLauncher(runtimeConfig)
+        .run(Configurations.merge(driverConf, namedParamsConf), timeout);
+
+    LOG.log(Level.INFO, "REEF job completed: {0}", state);
+    return state;
+  }
+
+  public static void main(final String[] args) throws InjectionException {
+    final Configuration commandLineConf = parseCommandLine(args);
+    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+    final boolean isLocal = injector.getNamedInstance(Local.class);
+    final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
+    runFailureReef(getRunTimeConfiguration(isLocal), jobTimeout, 40, 10);
+  }
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private FailureREEF() {
+  }
+
+  /**
+   * Command line parameter = true to run locally, or false to run on YARN.
+   */
+  @NamedParameter(doc = "Whether or not to run on the local runtime",
+      short_name = "local", default_value = "true")
+  public static final class Local implements Name<Boolean> {
+  }
+
+  /**
+   * Number of minutes before timeout.
+   */
+  @NamedParameter(doc = "Number of minutes before timeout",
+      short_name = "timeout", default_value = "2")
+  public static final class TimeOut implements Name<Integer> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java
new file mode 100644
index 0000000..ca60a12
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for Evaluator failures.
+ */
+package org.apache.reef.tests.evaluator.failure;

http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java
new file mode 100644
index 0000000..c5123ad
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToFail.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluator.failure.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tests.evaluator.failure.FailureDriver;
+
+/**
+ * The number of Evaluators to fail and resubmit in {@link FailureDriver}.
+ */
+@NamedParameter(doc = "The number of Evaluators to fail and resubmit in FailureDriver.")
+public final class NumEvaluatorsToFail implements Name<Integer> {
+  private NumEvaluatorsToFail() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java
new file mode 100644
index 0000000..7572f49
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/NumEvaluatorsToSubmit.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.evaluator.failure.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tests.evaluator.failure.FailureDriver;
+
+/**
+ * The number of Evaluators to submit in {@link FailureDriver}.
+ */
+@NamedParameter(doc = "The number of Evaluators to submit in FailureDriver.")
+public final class NumEvaluatorsToSubmit implements Name<Integer> {
+  private NumEvaluatorsToSubmit() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java
new file mode 100644
index 0000000..72a5101
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/evaluator/failure/parameters/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Parameters for {@link org.apache.reef.tests.evaluator.failure.FailureDriver}.
+ */
+package org.apache.reef.tests.evaluator.failure.parameters;

http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
deleted file mode 100644
index 037b2c2..0000000
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.tests.yarn.failure;
-
-import org.apache.reef.driver.context.ContextConfiguration;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorRequest;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.poison.PoisonedConfiguration;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.event.StartTime;
-
-import javax.inject.Inject;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Driver for failure test.
- */
-@Unit
-public class FailureDriver {
-
-  private static final int NUM_EVALUATORS = 40;
-  private static final int NUM_FAILURES = 10;
-  private final AtomicInteger toSubmit = new AtomicInteger(NUM_FAILURES);
-  private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName());
-  private final EvaluatorRequestor requestor;
-
-  @Inject
-  public FailureDriver(final EvaluatorRequestor requestor) {
-    this.requestor = requestor;
-    LOG.info("Driver instantiated");
-  }
-
-  /**
-   * Handles the StartTime event: Request as single Evaluator.
-   */
-  final class StartHandler implements EventHandler<StartTime> {
-    @Override
-    public void onNext(final StartTime startTime) {
-      LOG.log(Level.FINE, "Request {0} Evaluators.", NUM_EVALUATORS);
-      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
-          .setNumber(NUM_EVALUATORS)
-          .setMemory(64)
-          .setNumberOfCores(1)
-          .build());
-    }
-  }
-
-  /**
-   * Handles AllocatedEvaluator: Submit a poisoned context.
-   */
-  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
-    @Override
-    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
-      final String evalId = allocatedEvaluator.getId();
-      LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId);
-      if (toSubmit.getAndDecrement() > 0) {
-        LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", toSubmit);
-        allocatedEvaluator.submitContext(
-            Tang.Factory.getTang()
-                .newConfigurationBuilder(
-                    ContextConfiguration.CONF
-                        .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId)
-                        .build(),
-                    PoisonedConfiguration.CONTEXT_CONF
-                        .set(PoisonedConfiguration.CRASH_PROBABILITY, "1")
-                        .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
-                        .build())
-                .build());
-      } else {
-        LOG.log(Level.FINE, "Closing evaluator {0}", evalId);
-        allocatedEvaluator.close();
-      }
-    }
-  }
-
-  /**
-   * Handles FailedEvaluator: Resubmits the single Evaluator resource request.
-   */
-  final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
-    @Override
-    public void onNext(final FailedEvaluator failedEvaluator) {
-      LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId());
-      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
-          .setNumber(1)
-          .setMemory(64)
-          .setNumberOfCores(1)
-          .build());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java
deleted file mode 100644
index d056dad..0000000
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureREEF.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.tests.yarn.failure;
-
-import org.apache.reef.client.DriverConfiguration;
-import org.apache.reef.client.DriverLauncher;
-import org.apache.reef.client.LauncherStatus;
-import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
-import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.JavaConfigurationBuilder;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
-import org.apache.reef.tang.exceptions.BindException;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tang.formats.CommandLine;
-import org.apache.reef.util.EnvironmentUtils;
-
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Entry point class for REEF failure test.
- */
-public final class FailureREEF {
-  /**
-   * The upper limit on the number of Evaluators that the local resourcemanager will hand out concurrently.
-   */
-  public static final int MAX_NUMBER_OF_EVALUATORS = 16;
-
-  private static final Logger LOG = Logger.getLogger(FailureREEF.class.getName());
-
-  private static Configuration parseCommandLine(final String[] aArgs) {
-    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
-    try {
-      new CommandLine(cb)
-          .registerShortNameOfClass(Local.class)
-          .registerShortNameOfClass(TimeOut.class)
-          .processCommandLine(aArgs);
-      return cb.build();
-    } catch (final BindException | IOException ex) {
-      final String msg = "Unable to parse command line";
-      LOG.log(Level.SEVERE, msg, ex);
-      throw new RuntimeException(msg, ex);
-    }
-  }
-
-  /**
-   * @return (immutable) TANG Configuration object.
-   * @throws BindException      if configuration injector fails.
-   * @throws InjectionException if the Local.class parameter is not injected.
-   */
-  private static Configuration getRunTimeConfiguration(final boolean isLocal) throws BindException {
-
-    final Configuration runtimeConfiguration;
-
-    if (isLocal) {
-      LOG.log(Level.INFO, "Running Failure demo on the local runtime");
-      runtimeConfiguration = LocalRuntimeConfiguration.CONF
-          .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, MAX_NUMBER_OF_EVALUATORS)
-          .build();
-    } else {
-      LOG.log(Level.INFO, "Running Failure demo on YARN");
-      runtimeConfiguration = YarnClientConfiguration.CONF.build();
-    }
-
-    return runtimeConfiguration;
-  }
-
-  public static LauncherStatus runFailureReef(
-      final Configuration runtimeConfig, final int timeout) throws InjectionException {
-
-    final Configuration driverConf = DriverConfiguration.CONF
-        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(FailureDriver.class))
-        .set(DriverConfiguration.DRIVER_IDENTIFIER, "FailureREEF")
-        .set(DriverConfiguration.ON_DRIVER_STARTED, FailureDriver.StartHandler.class)
-        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, FailureDriver.EvaluatorAllocatedHandler.class)
-        .set(DriverConfiguration.ON_EVALUATOR_FAILED, FailureDriver.EvaluatorFailedHandler.class)
-        .build();
-
-    final LauncherStatus state = DriverLauncher.getLauncher(runtimeConfig).run(driverConf, timeout);
-    LOG.log(Level.INFO, "REEF job completed: {0}", state);
-    return state;
-  }
-
-  public static void main(final String[] args) throws InjectionException {
-    final Configuration commandLineConf = parseCommandLine(args);
-    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
-    final boolean isLocal = injector.getNamedInstance(Local.class);
-    final int jobTimeout = injector.getNamedInstance(TimeOut.class) * 60 * 1000;
-    runFailureReef(getRunTimeConfiguration(isLocal), jobTimeout);
-  }
-
-  /**
-   * Empty private constructor to prohibit instantiation of utility class.
-   */
-  private FailureREEF() {
-  }
-
-  /**
-   * Command line parameter = true to run locally, or false to run on YARN.
-   */
-  @NamedParameter(doc = "Whether or not to run on the local runtime",
-      short_name = "local", default_value = "true")
-  public static final class Local implements Name<Boolean> {
-  }
-
-  /**
-   * Number of minutes before timeout.
-   */
-  @NamedParameter(doc = "Number of minutes before timeout",
-      short_name = "timeout", default_value = "2")
-  public static final class TimeOut implements Name<Integer> {
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java
deleted file mode 100644
index 816f866..0000000
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * Tests for YARN failures.
- */
-package org.apache.reef.tests.yarn.failure;

http://git-wip-us.apache.org/repos/asf/reef/blob/bf5caab9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java
index a25a1bf..a2962c4 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/FailureTest.java
@@ -21,7 +21,7 @@ package org.apache.reef.tests;
 import org.apache.reef.client.LauncherStatus;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.tests.yarn.failure.FailureREEF;
+import org.apache.reef.tests.evaluator.failure.FailureREEF;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -45,12 +45,24 @@ public class FailureTest {
   }
 
   @Test
+  public void testSingleEvaluatorFailureAndRestart() throws InjectionException {
+    runTestFailureReefWithParams(1, 1, "testSingleEvaluatorFailureAndRestart");
+  }
+
+  @Test
   public void testFailureRestart() throws InjectionException {
+    runTestFailureReefWithParams(30, 5, "testFailureRestart");
+  }
+
+  private void runTestFailureReefWithParams(final int numEvaluatorsToSubmit,
+                                            final int numEvaluatorsTofail,
+                                            final String testName) throws InjectionException {
     final Configuration runtimeConfiguration = this.testEnvironment.getRuntimeConfiguration();
 
     final LauncherStatus status =
-        FailureREEF.runFailureReef(runtimeConfiguration, this.testEnvironment.getTestTimeout());
+        FailureREEF.runFailureReef(runtimeConfiguration, this.testEnvironment.getTestTimeout(),
+            numEvaluatorsToSubmit, numEvaluatorsTofail);
 
-    Assert.assertTrue("FailureReef failed: " + status, status.isSuccess());
+    Assert.assertTrue("FailureReef " + testName + " failed: " + status, status.isSuccess());
   }
 }