You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:39 UTC

[06/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Client.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Client.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Client.java
new file mode 100644
index 0000000..cbad811
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Client.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.TestDriverLauncher;
+import org.apache.reef.util.EnvironmentUtils;
+
+/**
+ * Client for the test REEF job that fails on different stages of execution.
+ */
+public final class Client {
+
+  public static LauncherStatus run(
+      final Class<? extends Task> failTaskClass,
+      final Configuration runtimeConfig,
+      final int timeOut) throws BindException, InjectionException {
+
+    final Configuration driverConfig = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(Driver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, failTaskClass.getSimpleName())
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, Driver.AllocatedEvaluatorHandler.class)
+        .set(DriverConfiguration.ON_TASK_RUNNING, Driver.RunningTaskHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, Driver.ActiveContextHandler.class)
+        .set(DriverConfiguration.ON_DRIVER_STARTED, Driver.StartHandler.class)
+        .build();
+
+    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+    cb.addConfiguration(driverConfig);
+    cb.bindNamedParameter(Driver.FailTaskName.class, failTaskClass.getSimpleName());
+
+    return TestDriverLauncher.getLauncher(runtimeConfig).run(cb.build(), timeOut);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Driver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Driver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Driver.java
new file mode 100644
index 0000000..0d3ed1d
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Driver.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public final class Driver {
+
+  private static final Logger LOG = Logger.getLogger(Driver.class.getName());
+  private final transient String failTaskName;
+  private final transient EvaluatorRequestor requestor;
+  private transient String taskId;
+
+  @Inject
+  public Driver(final @Parameter(FailTaskName.class) String failTaskName,
+                final EvaluatorRequestor requestor) {
+    this.failTaskName = failTaskName;
+    this.requestor = requestor;
+  }
+
+  /**
+   * Name of the message class to specify the failing message handler.
+   */
+  @NamedParameter(doc = "Full name of the (failing) task class", short_name = "task")
+  public static final class FailTaskName implements Name<String> {
+  }
+
+  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator eval) {
+
+      try {
+
+        taskId = failTaskName + "_" + eval.getId();
+        LOG.log(Level.INFO, "Submit task: {0}", taskId);
+
+        final Configuration contextConfig =
+            ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, taskId).build();
+
+        ConfigurationModule taskConfig =
+            TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, taskId);
+
+        switch (failTaskName) {
+          case "FailTask":
+            taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTask.class);
+            break;
+          case "FailTaskCall":
+            taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTaskCall.class);
+            break;
+          case "FailTaskMsg":
+            taskConfig = taskConfig
+                .set(TaskConfiguration.TASK, FailTaskMsg.class)
+                .set(TaskConfiguration.ON_MESSAGE, FailTaskMsg.class);
+            break;
+          case "FailTaskSuspend":
+            taskConfig = taskConfig
+                .set(TaskConfiguration.TASK, FailTaskSuspend.class)
+                .set(TaskConfiguration.ON_SUSPEND, FailTaskSuspend.class);
+            break;
+          case "FailTaskStart":
+            taskConfig = taskConfig
+                .set(TaskConfiguration.TASK, FailTaskStart.class)
+                .set(TaskConfiguration.ON_TASK_STARTED, FailTaskStart.class);
+            break;
+          case "FailTaskStop":
+            taskConfig = taskConfig
+                .set(TaskConfiguration.TASK, FailTaskStop.class)
+                .set(TaskConfiguration.ON_TASK_STOP, FailTaskStop.class)
+                .set(TaskConfiguration.ON_CLOSE, FailTaskStop.CloseEventHandler.class);
+            break;
+          case "FailTaskClose":
+            taskConfig = taskConfig
+                .set(TaskConfiguration.TASK, FailTaskClose.class)
+                .set(TaskConfiguration.ON_CLOSE, FailTaskClose.class);
+            break;
+        }
+
+        eval.submitContextAndTask(contextConfig, taskConfig.build());
+
+      } catch (final BindException ex) {
+        LOG.log(Level.WARNING, "Configuration error", ex);
+        throw new DriverSideFailure("Configuration error", ex);
+      }
+    }
+  }
+
+  final class RunningTaskHandler implements EventHandler<RunningTask> {
+    @Override
+    public void onNext(final RunningTask task) {
+
+      LOG.log(Level.INFO, "TaskRuntime: {0} expect {1}",
+          new Object[]{task.getId(), taskId});
+
+      if (!taskId.equals(task.getId())) {
+        throw new DriverSideFailure("Task ID " + task.getId()
+            + " not equal expected ID " + taskId);
+      }
+
+      switch (failTaskName) {
+        case "FailTaskMsg":
+          LOG.log(Level.INFO, "TaskRuntime: Send message: {0}", task);
+          task.send(new byte[0]);
+          break;
+        case "FailTaskSuspend":
+          LOG.log(Level.INFO, "TaskRuntime: Suspend: {0}", task);
+          task.suspend();
+          break;
+        case "FailTaskStop":
+        case "FailTaskClose":
+          LOG.log(Level.INFO, "TaskRuntime: Stop/Close: {0}", task);
+          task.close();
+          break;
+      }
+    }
+  }
+
+  final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext context) throws DriverSideFailure {
+      throw new DriverSideFailure("Unexpected ActiveContext message: " + context.getId());
+    }
+  }
+
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime time) {
+      LOG.log(Level.INFO, "StartTime: {0}", time);
+      Driver.this.requestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(1).setMemory(128).setNumberOfCores(1).build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTask.java
new file mode 100644
index 0000000..ff8f905
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTask.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.tests.library.exceptions.TaskSideFailure;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we create it.
+ */
+public final class FailTask implements Task {
+
+  private static final Logger LOG = Logger.getLogger(FailTask.class.getName());
+
+  @Inject
+  public FailTask() throws SimulatedTaskFailure {
+    final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTask constructor called.");
+    LOG.log(Level.FINE, "FailTask created - failing now: {0}", ex);
+    throw ex;
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) throws TaskSideFailure {
+    final RuntimeException ex = new TaskSideFailure("FailTask.call() should never be called.");
+    LOG.log(Level.SEVERE, "FailTask.call() invoked - that should never happen!", ex);
+    throw ex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskCall.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskCall.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskCall.java
new file mode 100644
index 0000000..b923dd2
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskCall.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we run it.
+ */
+public final class FailTaskCall implements Task {
+
+  private static final Logger LOG = Logger.getLogger(FailTaskCall.class.getName());
+
+  @Inject
+  public FailTaskCall() {
+    LOG.info("FailTaskCall created.");
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) throws SimulatedTaskFailure {
+    final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskCall.call() invoked.");
+    LOG.log(Level.FINE, "FailTaskCall.call() invoked: {0}", ex);
+    throw ex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskClose.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskClose.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskClose.java
new file mode 100644
index 0000000..b8d25e7
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskClose.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we close it.
+ */
+public final class FailTaskClose implements Task, EventHandler<CloseEvent> {
+
+  private static final Logger LOG = Logger.getLogger(FailTaskClose.class.getName());
+
+  private transient boolean isRunning = true;
+
+  @Inject
+  public FailTaskClose() {
+    LOG.fine("FailTaskClose created.");
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) {
+    synchronized (this) {
+      LOG.fine("FailTaskClose.call() invoked. Waiting for the message.");
+      while (this.isRunning) {
+        try {
+          this.wait();
+        } catch (final InterruptedException ex) {
+          LOG.log(Level.WARNING, "wait() interrupted.", ex);
+        }
+      }
+    }
+    return new byte[0];
+  }
+
+  @Override
+  public void onNext(final CloseEvent event) throws SimulatedTaskFailure {
+    final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskClose.send() invoked.");
+    LOG.log(Level.FINE, "FailTaskClose.onNext() invoked. Raise exception: {0}", ex.toString());
+    throw ex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskMsg.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskMsg.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskMsg.java
new file mode 100644
index 0000000..34a9909
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskMsg.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we send it a message.
+ */
+public final class FailTaskMsg implements Task, EventHandler<DriverMessage> {
+
+  private static final Logger LOG = Logger.getLogger(FailTaskMsg.class.getName());
+  private transient boolean isRunning = true;
+
+  @Inject
+  public FailTaskMsg() {
+    LOG.info("FailTaskMsg created.");
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) {
+    synchronized (this) {
+      LOG.info("FailTaskMsg.call() invoked. Waiting for the message.");
+      while (this.isRunning) {
+        try {
+          this.wait();
+        } catch (final InterruptedException ex) {
+          LOG.log(Level.WARNING, "wait() interrupted.", ex);
+        }
+      }
+    }
+    return new byte[0];
+  }
+
+  @Override
+  public void onNext(final DriverMessage driverMessage) throws SimulatedTaskFailure {
+    final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskMsg.send() invoked.");
+    LOG.log(Level.FINE, "FailTaskMsg.send() invoked: {0}", ex);
+    throw ex;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStart.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStart.java
new file mode 100644
index 0000000..9913f2e
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStart.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we invoke it.
+ */
+public final class FailTaskStart implements Task, EventHandler<TaskStart> {
+
+  private static final Logger LOG = Logger.getLogger(FailTaskStart.class.getName());
+
+  private transient boolean isRunning = true;
+
+  @Inject
+  public FailTaskStart() {
+    LOG.info("FailTaskStart created.");
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) {
+    synchronized (this) {
+      LOG.info("FailTaskStart.call() invoked. Waiting for the message.");
+      while (this.isRunning) {
+        try {
+          this.wait();
+        } catch (final InterruptedException ex) {
+          LOG.log(Level.WARNING, "wait() interrupted.", ex);
+        }
+      }
+    }
+    return new byte[0];
+  }
+
+  @Override
+  public void onNext(final TaskStart event) throws SimulatedTaskFailure {
+    final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskStart.send() invoked.");
+    LOG.log(Level.FINE, "FailTaskStart.onNext() invoked: {0}", ex);
+    throw ex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStop.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStop.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStop.java
new file mode 100644
index 0000000..2398db0
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStop.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we stop it.
+ */
+@Unit
+public final class FailTaskStop implements Task, EventHandler<TaskStop> {
+
+  private static final Logger LOG = Logger.getLogger(FailTaskStop.class.getName());
+
+  private transient boolean isRunning = true;
+
+  @Inject
+  public FailTaskStop() {
+    LOG.fine("FailTaskStop created.");
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) {
+    synchronized (this) {
+      LOG.fine("FailTaskStop.call() invoked. Waiting for the message.");
+      while (this.isRunning) {
+        try {
+          this.wait();
+        } catch (final InterruptedException ex) {
+          LOG.log(Level.WARNING, "wait() interrupted.", ex);
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void onNext(final TaskStop event) throws SimulatedTaskFailure {
+    final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskStop.send() invoked.");
+    LOG.log(Level.FINE, "FailTaskStop.onNext() invoked. Raise exception: {0}", ex.toString());
+    throw ex;
+  }
+
+  public final class CloseEventHandler implements EventHandler<CloseEvent> {
+    @Override
+    public void onNext(final CloseEvent event) {
+      LOG.log(Level.FINEST, "FailTaskStop.CloseEventHandler.onNext() invoked: {0}", event);
+      synchronized (FailTaskStop.this) {
+        isRunning = false;
+        FailTaskStop.this.notify();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskSuspend.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskSuspend.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskSuspend.java
new file mode 100644
index 0000000..08fcfaf
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskSuspend.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we invoke it.
+ */
+public final class FailTaskSuspend implements Task, EventHandler<SuspendEvent> {
+
+  private static final Logger LOG = Logger.getLogger(FailTaskSuspend.class.getName());
+
+  private transient boolean isRunning = true;
+
+  @Inject
+  public FailTaskSuspend() {
+    LOG.info("FailTaskSuspend created.");
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) {
+    synchronized (this) {
+      LOG.info("FailTaskSuspend.call() invoked. Waiting for suspend request.");
+      while (this.isRunning) {
+        try {
+          this.wait();
+        } catch (final InterruptedException ex) {
+          LOG.log(Level.WARNING, "wait() interrupted.", ex);
+        }
+      }
+    }
+    return new byte[0];
+  }
+
+  @Override
+  public void onNext(final SuspendEvent event) throws SimulatedTaskFailure {
+    // synchronized (this) {
+    //   this.isRunning = false;
+    //   this.notify();
+    // }
+    final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskSuspend.send() invoked.");
+    LOG.log(Level.FINE, "FailTaskSuspend.send() invoked: {0}", ex);
+    throw ex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/ExpectedTaskFailureHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/ExpectedTaskFailureHandler.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/ExpectedTaskFailureHandler.java
new file mode 100644
index 0000000..b035a09
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/ExpectedTaskFailureHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.driver;
+
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.tests.library.exceptions.ExpectedTaskException;
+import org.apache.reef.util.Exceptions;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * A handler for FailedTask that will throw a DriverSideFailure unless the FailedTask was triggered by an
+ * ExpectedTaskException in the Task.
+ */
+public final class ExpectedTaskFailureHandler implements EventHandler<FailedTask> {
+
+  @Inject
+  public ExpectedTaskFailureHandler() {
+  }
+
+  /**
+   * Checks whether the FailedTask was caused by a ExpectedTaskException.
+   *
+   * @param failedTask
+   * @throws org.apache.reef.tests.library.exceptions.DriverSideFailure if the FailedTask wasn't triggered by a
+   *                                                                    ExpectedTaskException
+   */
+
+  @Override
+  public void onNext(final FailedTask failedTask) {
+    final Optional<Throwable> reasonOptional = failedTask.getReason();
+    if (!reasonOptional.isPresent()) {
+      throw new DriverSideFailure("Received a FailedTask, but it did not contain an exception.");
+    } else if (!(Exceptions.getUltimateCause(reasonOptional.get()) instanceof ExpectedTaskException)) {
+      throw new DriverSideFailure("Received a FailedTask, but the ExpectedTaskException isn't the ultimate cause.",
+          reasonOptional.get());
+    }
+    failedTask.getActiveContext().get().close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/OnDriverStartedAllocateOne.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/OnDriverStartedAllocateOne.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/OnDriverStartedAllocateOne.java
new file mode 100644
index 0000000..4a74c04
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/OnDriverStartedAllocateOne.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.driver;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+
+/**
+ * A Driver start handler that requests a single Evaluator of size 64MB.
+ */
+public final class OnDriverStartedAllocateOne implements EventHandler<StartTime> {
+
+  private final EvaluatorRequestor requestor;
+
+  @Inject
+  OnDriverStartedAllocateOne(EvaluatorRequestor requestor) {
+    this.requestor = requestor;
+  }
+
+  @Override
+  public void onNext(final StartTime startTime) {
+    this.requestor.submit(EvaluatorRequest.newBuilder()
+        .setMemory(64)
+        .setNumber(1)
+        .setNumberOfCores(1)
+        .build());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ClientSideFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ClientSideFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ClientSideFailure.java
new file mode 100644
index 0000000..cfa5bb9
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ClientSideFailure.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the client side.
+ */
+public class ClientSideFailure extends RuntimeException {
+
+  public ClientSideFailure() {
+  }
+
+  public ClientSideFailure(final String string) {
+    super(string);
+  }
+
+  public ClientSideFailure(final String string, final Throwable thrwbl) {
+    super(string, thrwbl);
+  }
+
+  public ClientSideFailure(final Throwable thrwbl) {
+    super(thrwbl);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/DriverSideFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/DriverSideFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/DriverSideFailure.java
new file mode 100644
index 0000000..0d65d7e
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/DriverSideFailure.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the driver side.
+ */
+public class DriverSideFailure extends RuntimeException {
+
+  public DriverSideFailure() {
+  }
+
+  public DriverSideFailure(final String message) {
+    super(message);
+  }
+
+  public DriverSideFailure(final String message, final Throwable cause) {
+    super(message, cause);
+  }
+
+  public DriverSideFailure(final Throwable cause) {
+    super(cause);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ExpectedTaskException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ExpectedTaskException.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ExpectedTaskException.java
new file mode 100644
index 0000000..0c64e1a
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ExpectedTaskException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Expected Exception thrown by a Task.
+ */
+public final class ExpectedTaskException extends RuntimeException {
+  public ExpectedTaskException() {
+  }
+
+  public ExpectedTaskException(String s) {
+    super(s);
+  }
+
+  public ExpectedTaskException(String s, Throwable throwable) {
+    super(s, throwable);
+  }
+
+  public ExpectedTaskException(Throwable throwable) {
+    super(throwable);
+  }
+
+  public ExpectedTaskException(String s, Throwable throwable, boolean b, boolean b2) {
+    super(s, throwable, b, b2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedDriverFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedDriverFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedDriverFailure.java
new file mode 100644
index 0000000..7033853
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedDriverFailure.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the task side.
+ */
+public class SimulatedDriverFailure extends RuntimeException {
+
+  public SimulatedDriverFailure() {
+    super();
+  }
+
+  public SimulatedDriverFailure(final String message) {
+    super(message);
+  }
+
+  public SimulatedDriverFailure(final String message, final Throwable cause) {
+    super(message, cause);
+  }
+
+  public SimulatedDriverFailure(final Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedTaskFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedTaskFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedTaskFailure.java
new file mode 100644
index 0000000..1ced6a4
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedTaskFailure.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the task side.
+ */
+public class SimulatedTaskFailure extends RuntimeException {
+
+  public SimulatedTaskFailure() {
+    super();
+  }
+
+  public SimulatedTaskFailure(final String message) {
+    super(message);
+  }
+
+  public SimulatedTaskFailure(final String message, final Throwable cause) {
+    super(message, cause);
+  }
+
+  public SimulatedTaskFailure(final Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/TaskSideFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/TaskSideFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/TaskSideFailure.java
new file mode 100644
index 0000000..3fe0623
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/TaskSideFailure.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the task side.
+ */
+public class TaskSideFailure extends RuntimeException {
+
+  public TaskSideFailure() {
+    super();
+  }
+
+  public TaskSideFailure(final String message) {
+    super(message);
+  }
+
+  public TaskSideFailure(final String message, final Throwable cause) {
+    super(message, cause);
+  }
+
+  public TaskSideFailure(final Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/UnexpectedTaskReturnValue.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/UnexpectedTaskReturnValue.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/UnexpectedTaskReturnValue.java
new file mode 100644
index 0000000..ef6318b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/UnexpectedTaskReturnValue.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown by the Driver in a test if a Task returned an unexpected value.
+ */
+public final class UnexpectedTaskReturnValue extends RuntimeException {
+  private final String expected;
+  private final String actual;
+
+  public UnexpectedTaskReturnValue(final String expected, final String actual) {
+    this.expected = expected;
+    this.actual = actual;
+  }
+
+  @Override
+  public String toString() {
+    return "UnexpectedTaskReturnValue{" +
+        "expected='" + expected + '\'' +
+        ", actual='" + actual + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/package-info.java
new file mode 100644
index 0000000..df3a0e3
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Commonly used event handlers and task implementations in our tests
+ */
+package org.apache.reef.tests.library;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/EchoTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/EchoTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/EchoTask.java
new file mode 100644
index 0000000..e42076a
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/EchoTask.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.tasks;
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * A Task that just sends the memento back.
+ */
+public final class EchoTask implements Task {
+
+  @Inject
+  private EchoTask() {
+  }
+
+  @Override
+  public byte[] call(byte[] memento) throws Exception {
+    return memento;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/NoopTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/NoopTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/NoopTask.java
new file mode 100644
index 0000000..c5186d4
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/NoopTask.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.tasks;
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * A Task that does nothing and returns null.
+ */
+public class NoopTask implements Task {
+
+  @Inject
+  private NoopTask() {
+  }
+
+  @Override
+  public byte[] call(byte[] memento) throws Exception {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessaging.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessaging.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessaging.java
new file mode 100644
index 0000000..c8cdacc
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessaging.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.messaging.driver;
+
+import org.apache.reef.client.*;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public final class DriverMessaging {
+
+  private static final Logger LOG = Logger.getLogger(DriverMessaging.class.getName());
+
+  private final REEF reef;
+
+  private String lastMessage = null;
+  private Optional<RunningJob> theJob = Optional.empty();
+  private LauncherStatus status = LauncherStatus.INIT;
+
+  @Inject
+  private DriverMessaging(final REEF reef) {
+    this.reef = reef;
+  }
+
+  public static LauncherStatus run(final Configuration runtimeConfiguration,
+                                   final int launcherTimeout) throws BindException, InjectionException {
+
+    final Configuration clientConfiguration = ClientConfiguration.CONF
+        .set(ClientConfiguration.ON_JOB_RUNNING, DriverMessaging.RunningJobHandler.class)
+        .set(ClientConfiguration.ON_JOB_MESSAGE, DriverMessaging.JobMessageHandler.class)
+        .set(ClientConfiguration.ON_JOB_COMPLETED, DriverMessaging.CompletedJobHandler.class)
+        .set(ClientConfiguration.ON_JOB_FAILED, DriverMessaging.FailedJobHandler.class)
+        .set(ClientConfiguration.ON_RUNTIME_ERROR, DriverMessaging.RuntimeErrorHandler.class)
+        .build();
+
+    return Tang.Factory.getTang()
+        .newInjector(runtimeConfiguration, clientConfiguration)
+        .getInstance(DriverMessaging.class).run(launcherTimeout, 1000);
+  }
+
+  public synchronized void close() {
+    if (this.status.isRunning()) {
+      this.status = LauncherStatus.FORCE_CLOSED;
+    }
+    if (this.theJob.isPresent()) {
+      this.theJob.get().close();
+    }
+    this.notify();
+  }
+
+  private LauncherStatus run(final long jobTimeout, final long statusTimeout) {
+
+    final long startTime = System.currentTimeMillis();
+    LOG.log(Level.INFO, "Submitting REEF Job");
+
+    final Configuration driverConfig = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(this.getClass()))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "DriverMessagingTest")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, DriverMessagingDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverMessagingDriver.AllocatedEvaluatorHandler.class)
+        .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverMessagingDriver.ClientMessageHandler.class)
+        .build();
+
+
+    this.reef.submit(driverConfig);
+
+    synchronized (this) {
+      while (!this.status.isDone()) {
+        LOG.log(Level.INFO, "Waiting for REEF job to finish.");
+        try {
+          this.wait(statusTimeout);
+        } catch (final InterruptedException ex) {
+          LOG.log(Level.FINER, "Waiting for REEF job interrupted.", ex);
+        }
+        if (System.currentTimeMillis() - startTime >= jobTimeout) {
+          LOG.log(Level.INFO, "Waiting for REEF job timed out after {0} sec.",
+              (System.currentTimeMillis() - startTime) / 1000);
+          break;
+        }
+      }
+    }
+
+    this.reef.close();
+    return this.status;
+  }
+
+  final class JobMessageHandler implements EventHandler<JobMessage> {
+    @Override
+    public void onNext(final JobMessage message) {
+      final String msg = new String(message.get());
+      synchronized (DriverMessaging.this) {
+        if (!msg.equals(DriverMessaging.this.lastMessage)) {
+          LOG.log(Level.SEVERE, "Expected {0} but got {1}",
+              new Object[]{DriverMessaging.this.lastMessage, msg});
+          DriverMessaging.this.status = LauncherStatus.FAILED;
+          DriverMessaging.this.notify();
+        }
+      }
+    }
+  }
+
+  final class RunningJobHandler implements EventHandler<RunningJob> {
+    @Override
+    public void onNext(final RunningJob job) {
+      LOG.log(Level.INFO, "The Job {0} is running", job.getId());
+      synchronized (DriverMessaging.this) {
+        DriverMessaging.this.status = LauncherStatus.RUNNING;
+        DriverMessaging.this.theJob = Optional.of(job);
+        DriverMessaging.this.lastMessage = "Hello, REEF!";
+        DriverMessaging.this.theJob.get().send(DriverMessaging.this.lastMessage.getBytes());
+      }
+    }
+  }
+
+  final class CompletedJobHandler implements EventHandler<CompletedJob> {
+    @Override
+    public void onNext(final CompletedJob job) {
+      LOG.log(Level.INFO, "Job Completed: {0}", job);
+      synchronized (DriverMessaging.this) {
+        DriverMessaging.this.status = LauncherStatus.COMPLETED;
+        DriverMessaging.this.notify();
+      }
+    }
+  }
+
+  final class FailedJobHandler implements EventHandler<FailedJob> {
+    @Override
+    public void onNext(final FailedJob job) {
+      LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), job.getReason().orElse(null));
+      synchronized (DriverMessaging.this) {
+        DriverMessaging.this.status = LauncherStatus.FAILED(job.getReason());
+        DriverMessaging.this.notify();
+      }
+    }
+  }
+
+  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
+    @Override
+    public void onNext(final FailedRuntime error) {
+      LOG.log(Level.SEVERE, "Received a runtime error: " + error, error.getReason().orElse(null));
+      synchronized (DriverMessaging.this) {
+        DriverMessaging.this.status = LauncherStatus.FAILED(error.getReason());
+        DriverMessaging.this.notify();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessagingDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessagingDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessagingDriver.java
new file mode 100644
index 0000000..ac31c31
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessagingDriver.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.messaging.driver;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Dummy implementation of a driver.
+ */
+@Unit
+final class DriverMessagingDriver {
+
+  private static final Logger LOG = Logger.getLogger(DriverMessagingDriver.class.getName());
+
+  private static final int DELAY = 2000; // 2 sec.
+
+  private final Clock clock;
+  private final JobMessageObserver client;
+
+  @Inject
+  DriverMessagingDriver(final Clock clock, final JobMessageObserver client) {
+    this.clock = clock;
+    this.client = client;
+  }
+
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      // Schedule an alarm to not go idle immediately
+      clock.scheduleAlarm(DELAY, new EventHandler<Alarm>() {
+        @Override
+        public void onNext(final Alarm alarm) {
+        }
+      });
+    }
+  }
+
+  /**
+   * Sends the message back to the client and schedules an alarm in 500ms
+   * such that the Driver does not immediately go idle.
+   */
+  final class ClientMessageHandler implements EventHandler<byte[]> {
+    @Override
+    public void onNext(final byte[] message) {
+      LOG.log(Level.INFO, "Message received: {0}", String.valueOf(message));
+      client.sendMessageToClient(message);
+      clock.scheduleAlarm(DELAY, new EventHandler<Alarm>() {
+        @Override
+        public void onNext(final Alarm alarm) {
+        }
+      });
+    }
+  }
+
+  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator eval) {
+      throw new RuntimeException("This should never be called");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingDriver.java
new file mode 100644
index 0000000..657c5a3
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingDriver.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.messaging.task;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.driver.task.TaskMessage;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public final class TaskMessagingDriver {
+
+  private static final Logger LOG = Logger.getLogger(TaskMessagingDriver.class.getName());
+  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+  private static final byte[] HELLO_STR = CODEC.encode("MESSAGE::HELLO");
+  private static final int DELAY = 1000; // send message to Task 1 sec. after TaskRuntime
+
+  private final transient JobMessageObserver client;
+  private final transient Clock clock;
+
+  @Inject
+  public TaskMessagingDriver(final JobMessageObserver client, final Clock clock) {
+    this.client = client;
+    this.clock = clock;
+  }
+
+  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+    @Override
+    public void onNext(final AllocatedEvaluator eval) {
+      final String taskId = "Task_" + eval.getId();
+      LOG.log(Level.INFO, "Submit task: {0}", taskId);
+
+      final Configuration taskConfig = TaskConfiguration.CONF
+          .set(TaskConfiguration.IDENTIFIER, taskId)
+          .set(TaskConfiguration.TASK, TaskMessagingTask.class)
+          .set(TaskConfiguration.ON_MESSAGE, TaskMessagingTask.DriverMessageHandler.class)
+          .set(TaskConfiguration.ON_SEND_MESSAGE, TaskMessagingTask.class)
+          .build();
+      eval.submitTask(taskConfig);
+    }
+  }
+
+  public final class TaskRunningHandler implements EventHandler<RunningTask> {
+    @Override
+    public void onNext(final RunningTask task) {
+      LOG.log(Level.FINE, "TaskRuntime: {0}", task.getId());
+      clock.scheduleAlarm(DELAY, new EventHandler<Alarm>() {
+        @Override
+        public void onNext(final Alarm alarm) {
+          task.send(HELLO_STR);
+        }
+      });
+    }
+  }
+
+  public final class TaskMessageHandler implements EventHandler<TaskMessage> {
+    @Override
+    public void onNext(final TaskMessage msg) {
+      LOG.log(Level.FINE, "TaskMessage: from {0}: {1}",
+          new Object[]{msg.getId(), CODEC.decode(msg.get())});
+      if (!Arrays.equals(msg.get(), HELLO_STR)) {
+        final RuntimeException ex = new DriverSideFailure("Unexpected message: " + CODEC.decode(msg.get()));
+        LOG.log(Level.SEVERE, "Bad message from " + msg.getId(), ex);
+        throw ex;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingTask.java
new file mode 100644
index 0000000..4c60c56
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingTask.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.messaging.task;
+
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.TaskMessage;
+import org.apache.reef.task.TaskMessageSource;
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that receives a message and sends it back to the driver.
+ */
+@Unit
+public final class TaskMessagingTask implements Task, TaskMessageSource {
+
+  private static final Logger LOG = Logger.getLogger(TaskMessagingTask.class.getName());
+  private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+  private static final TaskMessage INIT_MESSAGE = TaskMessage.from("", CODEC.encode("MESSAGE::INIT"));
+  private transient boolean isRunning = true;
+  private transient Optional<TaskMessage> message = Optional.empty();
+
+  @Inject
+  public TaskMessagingTask() {
+    LOG.info("TaskMsg created.");
+  }
+
+  @Override
+  public synchronized byte[] call(final byte[] memento) {
+    LOG.info("TaskMsg.call() invoked. Waiting for the message.");
+    while (this.isRunning) {
+      try {
+        this.wait();
+      } catch (final InterruptedException ex) {
+        LOG.log(Level.WARNING, "wait() interrupted.", ex);
+      }
+    }
+    return this.message.orElse(INIT_MESSAGE).get();
+  }
+
+  @Override
+  public synchronized Optional<TaskMessage> getMessage() {
+    LOG.log(Level.INFO, "TaskMsg.getMessage() invoked: {0}",
+        CODEC.decode(this.message.orElse(INIT_MESSAGE).get()));
+    if (this.message.isPresent()) {
+      this.isRunning = false;
+      this.notify();
+    }
+    return this.message;
+  }
+
+  public class DriverMessageHandler implements EventHandler<DriverMessage> {
+    @Override
+    public void onNext(DriverMessage driverMessage) {
+      final byte[] message = driverMessage.get().get();
+      LOG.log(Level.INFO, "TaskMsg.send() invoked: {0}", CODEC.decode(message));
+      TaskMessagingTask.this.message = Optional.of(TaskMessage.from(this.toString(), message));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/Counter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/Counter.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/Counter.java
new file mode 100644
index 0000000..e833e43
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/Counter.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.statepassing;
+
+import javax.inject.Inject;
+
+public class Counter {
+
+  private int value = 0;
+
+  @Inject
+  public Counter() {
+  }
+
+  public void increment() {
+    this.value += 1;
+  }
+
+  public int getValue() {
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingDriver.java
new file mode 100644
index 0000000..4b7bf39
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingDriver.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.statepassing;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.ServiceConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public class StatePassingDriver {
+
+  private static final Logger LOG = Logger.getLogger(StatePassingDriver.class.getName());
+
+  private static final int PASSES = 2;
+  private final JobMessageObserver client;
+  private int pass = 0;
+
+  @Inject
+  public StatePassingDriver(final JobMessageObserver client) {
+    this.client = client;
+  }
+
+  private static boolean allEqual(final byte value, final byte[] bytes) {
+    for (int i = 0; i < bytes.length; ++i) {
+      if (bytes[i] != value) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void nextPass(final ActiveContext activeContext) {
+    try {
+      activeContext.submitTask(TaskConfiguration.CONF
+          .set(TaskConfiguration.IDENTIFIER, "StatePassing-" + pass)
+          .set(TaskConfiguration.TASK, StatePassingTask.class)
+          .build());
+      ++pass;
+    } catch (final BindException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator eb) {
+      final JavaConfigurationBuilder b = Tang.Factory.getTang().newConfigurationBuilder();
+      try {
+        final Configuration contextConfiguration = ContextConfiguration.CONF
+            .set(ContextConfiguration.IDENTIFIER, "StatePassingContext")
+            .build();
+
+        final Configuration serviceConfiguration = ServiceConfiguration.CONF
+            .set(ServiceConfiguration.SERVICES, Counter.class)
+            .build();
+
+        eb.submitContextAndService(contextConfiguration, serviceConfiguration);
+      } catch (final BindException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  final class ContextActiveHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext activeContext) {
+      nextPass(activeContext);
+    }
+  }
+
+  final class TaskCompletedHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask completed) {
+      LOG.log(Level.INFO, "Received a completed task: " + completed);
+      final byte[] message = completed.get();
+
+      if (message.length != pass) {
+        final String msg = "Expected message of length " + pass + ", but got message of length " + message.length;
+        final RuntimeException ex = new RuntimeException(msg);
+        throw ex;
+      }
+      if (!allEqual((byte) 1, message)) {
+        final RuntimeException ex = new RuntimeException("Did not get the right message");
+        throw ex;
+      }
+
+      if (pass < PASSES) {
+        LOG.log(Level.INFO, "Submitting the next Task");
+        nextPass(completed.getActiveContext());
+      } else {
+        LOG.log(Level.INFO, "Done");
+        completed.getActiveContext().close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingTask.java
new file mode 100644
index 0000000..d7176dc
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingTask.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.statepassing;
+
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+
+public class StatePassingTask implements Task {
+
+
+  private final Counter c;
+
+  @Inject
+  public StatePassingTask(final Counter c) {
+    this.c = c;
+  }
+
+
+  private static byte[] makeArray(final int size, final byte value) {
+    final byte[] result = new byte[size];
+    Arrays.fill(result, value);
+    return result;
+  }
+
+
+  @Override
+  public byte[] call(byte[] memento) throws Exception {
+    this.c.increment();
+    return makeArray(this.c.getValue(), (byte) 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
new file mode 100644
index 0000000..1cc7abd
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.yarn.failure;
+
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.poison.PoisonedConfiguration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public class FailureDriver {
+
+  private static final int NUM_EVALUATORS = 40;
+  private static final int NUM_FAILURES = 10;
+  private final AtomicInteger toSubmit = new AtomicInteger(NUM_FAILURES);
+  private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName());
+  private final EvaluatorRequestor requestor;
+
+  @Inject
+  public FailureDriver(final EvaluatorRequestor requestor) {
+    this.requestor = requestor;
+    LOG.info("Driver instantiated");
+  }
+
+  /**
+   * Handles the StartTime event: Request as single Evaluator.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      LOG.log(Level.FINE, "Request {0} Evaluators.", NUM_EVALUATORS);
+      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(NUM_EVALUATORS)
+          .setMemory(64)
+          .setNumberOfCores(1)
+          .build());
+    }
+  }
+
+  /**
+   * Handles AllocatedEvaluator: Submit a poisoned context.
+   */
+  final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      final String evalId = allocatedEvaluator.getId();
+      LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId);
+      if (toSubmit.getAndDecrement() > 0) {
+        LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", toSubmit);
+        allocatedEvaluator.submitContext(
+            Tang.Factory.getTang()
+                .newConfigurationBuilder(
+                    ContextConfiguration.CONF
+                        .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId)
+                        .build(),
+                    PoisonedConfiguration.CONTEXT_CONF
+                        .set(PoisonedConfiguration.CRASH_PROBABILITY, "1")
+                        .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
+                        .build())
+                .build());
+      } else {
+        LOG.log(Level.FINE, "Closing evaluator {0}", evalId);
+        allocatedEvaluator.close();
+      }
+    }
+  }
+
+  /**
+   * Handles FailedEvaluator: Resubmits the single Evaluator resource request.
+   */
+  final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+    @Override
+    public void onNext(final FailedEvaluator failedEvaluator) {
+      LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId());
+      FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+          .setNumber(1)
+          .setMemory(64)
+          .setNumberOfCores(1)
+          .build());
+    }
+  }
+}