You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:39 UTC
[06/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Client.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Client.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Client.java
new file mode 100644
index 0000000..cbad811
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Client.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.TestDriverLauncher;
+import org.apache.reef.util.EnvironmentUtils;
+
+/**
+ * Client for the test REEF job that fails on different stages of execution.
+ */
+public final class Client {
+
+ public static LauncherStatus run(
+ final Class<? extends Task> failTaskClass,
+ final Configuration runtimeConfig,
+ final int timeOut) throws BindException, InjectionException {
+
+ final Configuration driverConfig = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(Driver.class))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, failTaskClass.getSimpleName())
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, Driver.AllocatedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_TASK_RUNNING, Driver.RunningTaskHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, Driver.ActiveContextHandler.class)
+ .set(DriverConfiguration.ON_DRIVER_STARTED, Driver.StartHandler.class)
+ .build();
+
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ cb.addConfiguration(driverConfig);
+ cb.bindNamedParameter(Driver.FailTaskName.class, failTaskClass.getSimpleName());
+
+ return TestDriverLauncher.getLauncher(runtimeConfig).run(cb.build(), timeOut);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Driver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Driver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Driver.java
new file mode 100644
index 0000000..0d3ed1d
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/Driver.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public final class Driver {
+
+ private static final Logger LOG = Logger.getLogger(Driver.class.getName());
+ private final transient String failTaskName;
+ private final transient EvaluatorRequestor requestor;
+ private transient String taskId;
+
+ @Inject
+ public Driver(final @Parameter(FailTaskName.class) String failTaskName,
+ final EvaluatorRequestor requestor) {
+ this.failTaskName = failTaskName;
+ this.requestor = requestor;
+ }
+
+ /**
+ * Name of the message class to specify the failing message handler.
+ */
+ @NamedParameter(doc = "Full name of the (failing) task class", short_name = "task")
+ public static final class FailTaskName implements Name<String> {
+ }
+
+ final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator eval) {
+
+ try {
+
+ taskId = failTaskName + "_" + eval.getId();
+ LOG.log(Level.INFO, "Submit task: {0}", taskId);
+
+ final Configuration contextConfig =
+ ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, taskId).build();
+
+ ConfigurationModule taskConfig =
+ TaskConfiguration.CONF.set(TaskConfiguration.IDENTIFIER, taskId);
+
+ switch (failTaskName) {
+ case "FailTask":
+ taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTask.class);
+ break;
+ case "FailTaskCall":
+ taskConfig = taskConfig.set(TaskConfiguration.TASK, FailTaskCall.class);
+ break;
+ case "FailTaskMsg":
+ taskConfig = taskConfig
+ .set(TaskConfiguration.TASK, FailTaskMsg.class)
+ .set(TaskConfiguration.ON_MESSAGE, FailTaskMsg.class);
+ break;
+ case "FailTaskSuspend":
+ taskConfig = taskConfig
+ .set(TaskConfiguration.TASK, FailTaskSuspend.class)
+ .set(TaskConfiguration.ON_SUSPEND, FailTaskSuspend.class);
+ break;
+ case "FailTaskStart":
+ taskConfig = taskConfig
+ .set(TaskConfiguration.TASK, FailTaskStart.class)
+ .set(TaskConfiguration.ON_TASK_STARTED, FailTaskStart.class);
+ break;
+ case "FailTaskStop":
+ taskConfig = taskConfig
+ .set(TaskConfiguration.TASK, FailTaskStop.class)
+ .set(TaskConfiguration.ON_TASK_STOP, FailTaskStop.class)
+ .set(TaskConfiguration.ON_CLOSE, FailTaskStop.CloseEventHandler.class);
+ break;
+ case "FailTaskClose":
+ taskConfig = taskConfig
+ .set(TaskConfiguration.TASK, FailTaskClose.class)
+ .set(TaskConfiguration.ON_CLOSE, FailTaskClose.class);
+ break;
+ }
+
+ eval.submitContextAndTask(contextConfig, taskConfig.build());
+
+ } catch (final BindException ex) {
+ LOG.log(Level.WARNING, "Configuration error", ex);
+ throw new DriverSideFailure("Configuration error", ex);
+ }
+ }
+ }
+
+ final class RunningTaskHandler implements EventHandler<RunningTask> {
+ @Override
+ public void onNext(final RunningTask task) {
+
+ LOG.log(Level.INFO, "TaskRuntime: {0} expect {1}",
+ new Object[]{task.getId(), taskId});
+
+ if (!taskId.equals(task.getId())) {
+ throw new DriverSideFailure("Task ID " + task.getId()
+ + " not equal expected ID " + taskId);
+ }
+
+ switch (failTaskName) {
+ case "FailTaskMsg":
+ LOG.log(Level.INFO, "TaskRuntime: Send message: {0}", task);
+ task.send(new byte[0]);
+ break;
+ case "FailTaskSuspend":
+ LOG.log(Level.INFO, "TaskRuntime: Suspend: {0}", task);
+ task.suspend();
+ break;
+ case "FailTaskStop":
+ case "FailTaskClose":
+ LOG.log(Level.INFO, "TaskRuntime: Stop/Close: {0}", task);
+ task.close();
+ break;
+ }
+ }
+ }
+
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext context) throws DriverSideFailure {
+ throw new DriverSideFailure("Unexpected ActiveContext message: " + context.getId());
+ }
+ }
+
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime time) {
+ LOG.log(Level.INFO, "StartTime: {0}", time);
+ Driver.this.requestor.submit(EvaluatorRequest.newBuilder()
+ .setNumber(1).setMemory(128).setNumberOfCores(1).build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTask.java
new file mode 100644
index 0000000..ff8f905
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTask.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.tests.library.exceptions.TaskSideFailure;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we create it.
+ */
+public final class FailTask implements Task {
+
+ private static final Logger LOG = Logger.getLogger(FailTask.class.getName());
+
+ @Inject
+ public FailTask() throws SimulatedTaskFailure {
+ final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTask constructor called.");
+ LOG.log(Level.FINE, "FailTask created - failing now: {0}", ex);
+ throw ex;
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) throws TaskSideFailure {
+ final RuntimeException ex = new TaskSideFailure("FailTask.call() should never be called.");
+ LOG.log(Level.SEVERE, "FailTask.call() invoked - that should never happen!", ex);
+ throw ex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskCall.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskCall.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskCall.java
new file mode 100644
index 0000000..b923dd2
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskCall.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we run it.
+ */
+public final class FailTaskCall implements Task {
+
+ private static final Logger LOG = Logger.getLogger(FailTaskCall.class.getName());
+
+ @Inject
+ public FailTaskCall() {
+ LOG.info("FailTaskCall created.");
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) throws SimulatedTaskFailure {
+ final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskCall.call() invoked.");
+ LOG.log(Level.FINE, "FailTaskCall.call() invoked: {0}", ex);
+ throw ex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskClose.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskClose.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskClose.java
new file mode 100644
index 0000000..b8d25e7
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskClose.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we close it.
+ */
+public final class FailTaskClose implements Task, EventHandler<CloseEvent> {
+
+ private static final Logger LOG = Logger.getLogger(FailTaskClose.class.getName());
+
+ private transient boolean isRunning = true;
+
+ @Inject
+ public FailTaskClose() {
+ LOG.fine("FailTaskClose created.");
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) {
+ synchronized (this) {
+ LOG.fine("FailTaskClose.call() invoked. Waiting for the message.");
+ while (this.isRunning) {
+ try {
+ this.wait();
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "wait() interrupted.", ex);
+ }
+ }
+ }
+ return new byte[0];
+ }
+
+ @Override
+ public void onNext(final CloseEvent event) throws SimulatedTaskFailure {
+ final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskClose.send() invoked.");
+ LOG.log(Level.FINE, "FailTaskClose.onNext() invoked. Raise exception: {0}", ex.toString());
+ throw ex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskMsg.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskMsg.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskMsg.java
new file mode 100644
index 0000000..34a9909
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskMsg.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we send it a message.
+ */
+public final class FailTaskMsg implements Task, EventHandler<DriverMessage> {
+
+ private static final Logger LOG = Logger.getLogger(FailTaskMsg.class.getName());
+ private transient boolean isRunning = true;
+
+ @Inject
+ public FailTaskMsg() {
+ LOG.info("FailTaskMsg created.");
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) {
+ synchronized (this) {
+ LOG.info("FailTaskMsg.call() invoked. Waiting for the message.");
+ while (this.isRunning) {
+ try {
+ this.wait();
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "wait() interrupted.", ex);
+ }
+ }
+ }
+ return new byte[0];
+ }
+
+ @Override
+ public void onNext(final DriverMessage driverMessage) throws SimulatedTaskFailure {
+ final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskMsg.send() invoked.");
+ LOG.log(Level.FINE, "FailTaskMsg.send() invoked: {0}", ex);
+ throw ex;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStart.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStart.java
new file mode 100644
index 0000000..9913f2e
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStart.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we invoke it.
+ */
+public final class FailTaskStart implements Task, EventHandler<TaskStart> {
+
+ private static final Logger LOG = Logger.getLogger(FailTaskStart.class.getName());
+
+ private transient boolean isRunning = true;
+
+ @Inject
+ public FailTaskStart() {
+ LOG.info("FailTaskStart created.");
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) {
+ synchronized (this) {
+ LOG.info("FailTaskStart.call() invoked. Waiting for the message.");
+ while (this.isRunning) {
+ try {
+ this.wait();
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "wait() interrupted.", ex);
+ }
+ }
+ }
+ return new byte[0];
+ }
+
+ @Override
+ public void onNext(final TaskStart event) throws SimulatedTaskFailure {
+ final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskStart.send() invoked.");
+ LOG.log(Level.FINE, "FailTaskStart.onNext() invoked: {0}", ex);
+ throw ex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStop.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStop.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStop.java
new file mode 100644
index 0000000..2398db0
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskStop.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we stop it.
+ */
+@Unit
+public final class FailTaskStop implements Task, EventHandler<TaskStop> {
+
+ private static final Logger LOG = Logger.getLogger(FailTaskStop.class.getName());
+
+ private transient boolean isRunning = true;
+
+ @Inject
+ public FailTaskStop() {
+ LOG.fine("FailTaskStop created.");
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) {
+ synchronized (this) {
+ LOG.fine("FailTaskStop.call() invoked. Waiting for the message.");
+ while (this.isRunning) {
+ try {
+ this.wait();
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "wait() interrupted.", ex);
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void onNext(final TaskStop event) throws SimulatedTaskFailure {
+ final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskStop.send() invoked.");
+ LOG.log(Level.FINE, "FailTaskStop.onNext() invoked. Raise exception: {0}", ex.toString());
+ throw ex;
+ }
+
+ public final class CloseEventHandler implements EventHandler<CloseEvent> {
+ @Override
+ public void onNext(final CloseEvent event) {
+ LOG.log(Level.FINEST, "FailTaskStop.CloseEventHandler.onNext() invoked: {0}", event);
+ synchronized (FailTaskStop.this) {
+ isRunning = false;
+ FailTaskStop.this.notify();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskSuspend.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskSuspend.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskSuspend.java
new file mode 100644
index 0000000..08fcfaf
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/FailTaskSuspend.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.fail.task;
+
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.tests.library.exceptions.SimulatedTaskFailure;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that just fails when we invoke it.
+ */
+public final class FailTaskSuspend implements Task, EventHandler<SuspendEvent> {
+
+ private static final Logger LOG = Logger.getLogger(FailTaskSuspend.class.getName());
+
+ private transient boolean isRunning = true;
+
+ @Inject
+ public FailTaskSuspend() {
+ LOG.info("FailTaskSuspend created.");
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) {
+ synchronized (this) {
+ LOG.info("FailTaskSuspend.call() invoked. Waiting for suspend request.");
+ while (this.isRunning) {
+ try {
+ this.wait();
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "wait() interrupted.", ex);
+ }
+ }
+ }
+ return new byte[0];
+ }
+
+ @Override
+ public void onNext(final SuspendEvent event) throws SimulatedTaskFailure {
+ // synchronized (this) {
+ // this.isRunning = false;
+ // this.notify();
+ // }
+ final SimulatedTaskFailure ex = new SimulatedTaskFailure("FailTaskSuspend.send() invoked.");
+ LOG.log(Level.FINE, "FailTaskSuspend.send() invoked: {0}", ex);
+ throw ex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/ExpectedTaskFailureHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/ExpectedTaskFailureHandler.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/ExpectedTaskFailureHandler.java
new file mode 100644
index 0000000..b035a09
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/ExpectedTaskFailureHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.driver;
+
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.tests.library.exceptions.ExpectedTaskException;
+import org.apache.reef.util.Exceptions;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * A handler for FailedTask that will throw a DriverSideFailure unless the FailedTask was triggered by an
+ * ExpectedTaskException in the Task.
+ */
+public final class ExpectedTaskFailureHandler implements EventHandler<FailedTask> {
+
+ @Inject
+ public ExpectedTaskFailureHandler() {
+ }
+
+ /**
+ * Checks whether the FailedTask was caused by a ExpectedTaskException.
+ *
+ * @param failedTask
+ * @throws org.apache.reef.tests.library.exceptions.DriverSideFailure if the FailedTask wasn't triggered by a
+ * ExpectedTaskException
+ */
+
+ @Override
+ public void onNext(final FailedTask failedTask) {
+ final Optional<Throwable> reasonOptional = failedTask.getReason();
+ if (!reasonOptional.isPresent()) {
+ throw new DriverSideFailure("Received a FailedTask, but it did not contain an exception.");
+ } else if (!(Exceptions.getUltimateCause(reasonOptional.get()) instanceof ExpectedTaskException)) {
+ throw new DriverSideFailure("Received a FailedTask, but the ExpectedTaskException isn't the ultimate cause.",
+ reasonOptional.get());
+ }
+ failedTask.getActiveContext().get().close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/OnDriverStartedAllocateOne.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/OnDriverStartedAllocateOne.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/OnDriverStartedAllocateOne.java
new file mode 100644
index 0000000..4a74c04
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/driver/OnDriverStartedAllocateOne.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.driver;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+
+/**
+ * A Driver start handler that requests a single Evaluator of size 64MB.
+ */
+public final class OnDriverStartedAllocateOne implements EventHandler<StartTime> {
+
+ private final EvaluatorRequestor requestor;
+
+ @Inject
+ OnDriverStartedAllocateOne(EvaluatorRequestor requestor) {
+ this.requestor = requestor;
+ }
+
+ @Override
+ public void onNext(final StartTime startTime) {
+ this.requestor.submit(EvaluatorRequest.newBuilder()
+ .setMemory(64)
+ .setNumber(1)
+ .setNumberOfCores(1)
+ .build());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ClientSideFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ClientSideFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ClientSideFailure.java
new file mode 100644
index 0000000..cfa5bb9
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ClientSideFailure.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the client side.
+ */
+public class ClientSideFailure extends RuntimeException {
+
+ public ClientSideFailure() {
+ }
+
+ public ClientSideFailure(final String string) {
+ super(string);
+ }
+
+ public ClientSideFailure(final String string, final Throwable thrwbl) {
+ super(string, thrwbl);
+ }
+
+ public ClientSideFailure(final Throwable thrwbl) {
+ super(thrwbl);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/DriverSideFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/DriverSideFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/DriverSideFailure.java
new file mode 100644
index 0000000..0d65d7e
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/DriverSideFailure.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the driver side.
+ */
+public class DriverSideFailure extends RuntimeException {
+
+ public DriverSideFailure() {
+ }
+
+ public DriverSideFailure(final String message) {
+ super(message);
+ }
+
+ public DriverSideFailure(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public DriverSideFailure(final Throwable cause) {
+ super(cause);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ExpectedTaskException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ExpectedTaskException.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ExpectedTaskException.java
new file mode 100644
index 0000000..0c64e1a
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/ExpectedTaskException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Expected Exception thrown by a Task.
+ */
+public final class ExpectedTaskException extends RuntimeException {
+ public ExpectedTaskException() {
+ }
+
+ public ExpectedTaskException(String s) {
+ super(s);
+ }
+
+ public ExpectedTaskException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public ExpectedTaskException(Throwable throwable) {
+ super(throwable);
+ }
+
+ public ExpectedTaskException(String s, Throwable throwable, boolean b, boolean b2) {
+ super(s, throwable, b, b2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedDriverFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedDriverFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedDriverFailure.java
new file mode 100644
index 0000000..7033853
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedDriverFailure.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the task side.
+ */
+public class SimulatedDriverFailure extends RuntimeException {
+
+ public SimulatedDriverFailure() {
+ super();
+ }
+
+ public SimulatedDriverFailure(final String message) {
+ super(message);
+ }
+
+ public SimulatedDriverFailure(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public SimulatedDriverFailure(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedTaskFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedTaskFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedTaskFailure.java
new file mode 100644
index 0000000..1ced6a4
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/SimulatedTaskFailure.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the task side.
+ */
+public class SimulatedTaskFailure extends RuntimeException {
+
+ public SimulatedTaskFailure() {
+ super();
+ }
+
+ public SimulatedTaskFailure(final String message) {
+ super(message);
+ }
+
+ public SimulatedTaskFailure(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public SimulatedTaskFailure(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/TaskSideFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/TaskSideFailure.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/TaskSideFailure.java
new file mode 100644
index 0000000..3fe0623
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/TaskSideFailure.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown when a test fails on the task side.
+ */
+public class TaskSideFailure extends RuntimeException {
+
+ public TaskSideFailure() {
+ super();
+ }
+
+ public TaskSideFailure(final String message) {
+ super(message);
+ }
+
+ public TaskSideFailure(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public TaskSideFailure(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/UnexpectedTaskReturnValue.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/UnexpectedTaskReturnValue.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/UnexpectedTaskReturnValue.java
new file mode 100644
index 0000000..ef6318b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/exceptions/UnexpectedTaskReturnValue.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.exceptions;
+
+/**
+ * Thrown by the Driver in a test if a Task returned an unexpected value.
+ */
+public final class UnexpectedTaskReturnValue extends RuntimeException {
+ private final String expected;
+ private final String actual;
+
+ public UnexpectedTaskReturnValue(final String expected, final String actual) {
+ this.expected = expected;
+ this.actual = actual;
+ }
+
+ @Override
+ public String toString() {
+ return "UnexpectedTaskReturnValue{" +
+ "expected='" + expected + '\'' +
+ ", actual='" + actual + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/package-info.java
new file mode 100644
index 0000000..df3a0e3
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Commonly used event handlers and task implementations in our tests
+ */
+package org.apache.reef.tests.library;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/EchoTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/EchoTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/EchoTask.java
new file mode 100644
index 0000000..e42076a
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/EchoTask.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.tasks;
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * A Task that just sends the memento back.
+ */
+public final class EchoTask implements Task {
+
+ @Inject
+ private EchoTask() {
+ }
+
+ @Override
+ public byte[] call(byte[] memento) throws Exception {
+ return memento;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/NoopTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/NoopTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/NoopTask.java
new file mode 100644
index 0000000..c5186d4
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/library/tasks/NoopTask.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.library.tasks;
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+
+/**
+ * A Task that does nothing and returns null.
+ */
+public class NoopTask implements Task {
+
+ @Inject
+ private NoopTask() {
+ }
+
+ @Override
+ public byte[] call(byte[] memento) throws Exception {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessaging.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessaging.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessaging.java
new file mode 100644
index 0000000..c8cdacc
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessaging.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.messaging.driver;
+
+import org.apache.reef.client.*;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public final class DriverMessaging {
+
+ private static final Logger LOG = Logger.getLogger(DriverMessaging.class.getName());
+
+ private final REEF reef;
+
+ private String lastMessage = null;
+ private Optional<RunningJob> theJob = Optional.empty();
+ private LauncherStatus status = LauncherStatus.INIT;
+
+ @Inject
+ private DriverMessaging(final REEF reef) {
+ this.reef = reef;
+ }
+
+ public static LauncherStatus run(final Configuration runtimeConfiguration,
+ final int launcherTimeout) throws BindException, InjectionException {
+
+ final Configuration clientConfiguration = ClientConfiguration.CONF
+ .set(ClientConfiguration.ON_JOB_RUNNING, DriverMessaging.RunningJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_MESSAGE, DriverMessaging.JobMessageHandler.class)
+ .set(ClientConfiguration.ON_JOB_COMPLETED, DriverMessaging.CompletedJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_FAILED, DriverMessaging.FailedJobHandler.class)
+ .set(ClientConfiguration.ON_RUNTIME_ERROR, DriverMessaging.RuntimeErrorHandler.class)
+ .build();
+
+ return Tang.Factory.getTang()
+ .newInjector(runtimeConfiguration, clientConfiguration)
+ .getInstance(DriverMessaging.class).run(launcherTimeout, 1000);
+ }
+
+ public synchronized void close() {
+ if (this.status.isRunning()) {
+ this.status = LauncherStatus.FORCE_CLOSED;
+ }
+ if (this.theJob.isPresent()) {
+ this.theJob.get().close();
+ }
+ this.notify();
+ }
+
+ private LauncherStatus run(final long jobTimeout, final long statusTimeout) {
+
+ final long startTime = System.currentTimeMillis();
+ LOG.log(Level.INFO, "Submitting REEF Job");
+
+ final Configuration driverConfig = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(this.getClass()))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "DriverMessagingTest")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, DriverMessagingDriver.StartHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverMessagingDriver.AllocatedEvaluatorHandler.class)
+ .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverMessagingDriver.ClientMessageHandler.class)
+ .build();
+
+
+ this.reef.submit(driverConfig);
+
+ synchronized (this) {
+ while (!this.status.isDone()) {
+ LOG.log(Level.INFO, "Waiting for REEF job to finish.");
+ try {
+ this.wait(statusTimeout);
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.FINER, "Waiting for REEF job interrupted.", ex);
+ }
+ if (System.currentTimeMillis() - startTime >= jobTimeout) {
+ LOG.log(Level.INFO, "Waiting for REEF job timed out after {0} sec.",
+ (System.currentTimeMillis() - startTime) / 1000);
+ break;
+ }
+ }
+ }
+
+ this.reef.close();
+ return this.status;
+ }
+
+ final class JobMessageHandler implements EventHandler<JobMessage> {
+ @Override
+ public void onNext(final JobMessage message) {
+ final String msg = new String(message.get());
+ synchronized (DriverMessaging.this) {
+ if (!msg.equals(DriverMessaging.this.lastMessage)) {
+ LOG.log(Level.SEVERE, "Expected {0} but got {1}",
+ new Object[]{DriverMessaging.this.lastMessage, msg});
+ DriverMessaging.this.status = LauncherStatus.FAILED;
+ DriverMessaging.this.notify();
+ }
+ }
+ }
+ }
+
+ final class RunningJobHandler implements EventHandler<RunningJob> {
+ @Override
+ public void onNext(final RunningJob job) {
+ LOG.log(Level.INFO, "The Job {0} is running", job.getId());
+ synchronized (DriverMessaging.this) {
+ DriverMessaging.this.status = LauncherStatus.RUNNING;
+ DriverMessaging.this.theJob = Optional.of(job);
+ DriverMessaging.this.lastMessage = "Hello, REEF!";
+ DriverMessaging.this.theJob.get().send(DriverMessaging.this.lastMessage.getBytes());
+ }
+ }
+ }
+
+ final class CompletedJobHandler implements EventHandler<CompletedJob> {
+ @Override
+ public void onNext(final CompletedJob job) {
+ LOG.log(Level.INFO, "Job Completed: {0}", job);
+ synchronized (DriverMessaging.this) {
+ DriverMessaging.this.status = LauncherStatus.COMPLETED;
+ DriverMessaging.this.notify();
+ }
+ }
+ }
+
+ final class FailedJobHandler implements EventHandler<FailedJob> {
+ @Override
+ public void onNext(final FailedJob job) {
+ LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), job.getReason().orElse(null));
+ synchronized (DriverMessaging.this) {
+ DriverMessaging.this.status = LauncherStatus.FAILED(job.getReason());
+ DriverMessaging.this.notify();
+ }
+ }
+ }
+
+ final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
+ @Override
+ public void onNext(final FailedRuntime error) {
+ LOG.log(Level.SEVERE, "Received a runtime error: " + error, error.getReason().orElse(null));
+ synchronized (DriverMessaging.this) {
+ DriverMessaging.this.status = LauncherStatus.FAILED(error.getReason());
+ DriverMessaging.this.notify();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessagingDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessagingDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessagingDriver.java
new file mode 100644
index 0000000..ac31c31
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/driver/DriverMessagingDriver.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.messaging.driver;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Dummy implementation of a driver.
+ */
+@Unit
+final class DriverMessagingDriver {
+
+ private static final Logger LOG = Logger.getLogger(DriverMessagingDriver.class.getName());
+
+ private static final int DELAY = 2000; // 2 sec.
+
+ private final Clock clock;
+ private final JobMessageObserver client;
+
+ @Inject
+ DriverMessagingDriver(final Clock clock, final JobMessageObserver client) {
+ this.clock = clock;
+ this.client = client;
+ }
+
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ // Schedule an alarm to not go idle immediately
+ clock.scheduleAlarm(DELAY, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm alarm) {
+ }
+ });
+ }
+ }
+
+ /**
+ * Sends the message back to the client and schedules an alarm in 500ms
+ * such that the Driver does not immediately go idle.
+ */
+ final class ClientMessageHandler implements EventHandler<byte[]> {
+ @Override
+ public void onNext(final byte[] message) {
+ LOG.log(Level.INFO, "Message received: {0}", String.valueOf(message));
+ client.sendMessageToClient(message);
+ clock.scheduleAlarm(DELAY, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm alarm) {
+ }
+ });
+ }
+ }
+
+ final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator eval) {
+ throw new RuntimeException("This should never be called");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingDriver.java
new file mode 100644
index 0000000..657c5a3
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingDriver.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.messaging.task;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.driver.task.TaskMessage;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tests.library.exceptions.DriverSideFailure;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public final class TaskMessagingDriver {
+
+ private static final Logger LOG = Logger.getLogger(TaskMessagingDriver.class.getName());
+ private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+ private static final byte[] HELLO_STR = CODEC.encode("MESSAGE::HELLO");
+ private static final int DELAY = 1000; // send message to Task 1 sec. after TaskRuntime
+
+ private final transient JobMessageObserver client;
+ private final transient Clock clock;
+
+ @Inject
+ public TaskMessagingDriver(final JobMessageObserver client, final Clock clock) {
+ this.client = client;
+ this.clock = clock;
+ }
+
+ public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+ @Override
+ public void onNext(final AllocatedEvaluator eval) {
+ final String taskId = "Task_" + eval.getId();
+ LOG.log(Level.INFO, "Submit task: {0}", taskId);
+
+ final Configuration taskConfig = TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, taskId)
+ .set(TaskConfiguration.TASK, TaskMessagingTask.class)
+ .set(TaskConfiguration.ON_MESSAGE, TaskMessagingTask.DriverMessageHandler.class)
+ .set(TaskConfiguration.ON_SEND_MESSAGE, TaskMessagingTask.class)
+ .build();
+ eval.submitTask(taskConfig);
+ }
+ }
+
+ public final class TaskRunningHandler implements EventHandler<RunningTask> {
+ @Override
+ public void onNext(final RunningTask task) {
+ LOG.log(Level.FINE, "TaskRuntime: {0}", task.getId());
+ clock.scheduleAlarm(DELAY, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm alarm) {
+ task.send(HELLO_STR);
+ }
+ });
+ }
+ }
+
+ public final class TaskMessageHandler implements EventHandler<TaskMessage> {
+ @Override
+ public void onNext(final TaskMessage msg) {
+ LOG.log(Level.FINE, "TaskMessage: from {0}: {1}",
+ new Object[]{msg.getId(), CODEC.decode(msg.get())});
+ if (!Arrays.equals(msg.get(), HELLO_STR)) {
+ final RuntimeException ex = new DriverSideFailure("Unexpected message: " + CODEC.decode(msg.get()));
+ LOG.log(Level.SEVERE, "Bad message from " + msg.getId(), ex);
+ throw ex;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingTask.java
new file mode 100644
index 0000000..4c60c56
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/messaging/task/TaskMessagingTask.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.messaging.task;
+
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.TaskMessage;
+import org.apache.reef.task.TaskMessageSource;
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A basic task that receives a message and sends it back to the driver.
+ */
+@Unit
+public final class TaskMessagingTask implements Task, TaskMessageSource {
+
+ private static final Logger LOG = Logger.getLogger(TaskMessagingTask.class.getName());
+ private static final ObjectSerializableCodec<String> CODEC = new ObjectSerializableCodec<>();
+ private static final TaskMessage INIT_MESSAGE = TaskMessage.from("", CODEC.encode("MESSAGE::INIT"));
+ private transient boolean isRunning = true;
+ private transient Optional<TaskMessage> message = Optional.empty();
+
+ @Inject
+ public TaskMessagingTask() {
+ LOG.info("TaskMsg created.");
+ }
+
+ @Override
+ public synchronized byte[] call(final byte[] memento) {
+ LOG.info("TaskMsg.call() invoked. Waiting for the message.");
+ while (this.isRunning) {
+ try {
+ this.wait();
+ } catch (final InterruptedException ex) {
+ LOG.log(Level.WARNING, "wait() interrupted.", ex);
+ }
+ }
+ return this.message.orElse(INIT_MESSAGE).get();
+ }
+
+ @Override
+ public synchronized Optional<TaskMessage> getMessage() {
+ LOG.log(Level.INFO, "TaskMsg.getMessage() invoked: {0}",
+ CODEC.decode(this.message.orElse(INIT_MESSAGE).get()));
+ if (this.message.isPresent()) {
+ this.isRunning = false;
+ this.notify();
+ }
+ return this.message;
+ }
+
+ public class DriverMessageHandler implements EventHandler<DriverMessage> {
+ @Override
+ public void onNext(DriverMessage driverMessage) {
+ final byte[] message = driverMessage.get().get();
+ LOG.log(Level.INFO, "TaskMsg.send() invoked: {0}", CODEC.decode(message));
+ TaskMessagingTask.this.message = Optional.of(TaskMessage.from(this.toString(), message));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/Counter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/Counter.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/Counter.java
new file mode 100644
index 0000000..e833e43
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/Counter.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.statepassing;
+
+import javax.inject.Inject;
+
+public class Counter {
+
+ private int value = 0;
+
+ @Inject
+ public Counter() {
+ }
+
+ public void increment() {
+ this.value += 1;
+ }
+
+ public int getValue() {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingDriver.java
new file mode 100644
index 0000000..4b7bf39
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingDriver.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.statepassing;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.ServiceConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public class StatePassingDriver {
+
+ private static final Logger LOG = Logger.getLogger(StatePassingDriver.class.getName());
+
+ private static final int PASSES = 2;
+ private final JobMessageObserver client;
+ private int pass = 0;
+
+ @Inject
+ public StatePassingDriver(final JobMessageObserver client) {
+ this.client = client;
+ }
+
+ private static boolean allEqual(final byte value, final byte[] bytes) {
+ for (int i = 0; i < bytes.length; ++i) {
+ if (bytes[i] != value) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void nextPass(final ActiveContext activeContext) {
+ try {
+ activeContext.submitTask(TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, "StatePassing-" + pass)
+ .set(TaskConfiguration.TASK, StatePassingTask.class)
+ .build());
+ ++pass;
+ } catch (final BindException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator eb) {
+ final JavaConfigurationBuilder b = Tang.Factory.getTang().newConfigurationBuilder();
+ try {
+ final Configuration contextConfiguration = ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, "StatePassingContext")
+ .build();
+
+ final Configuration serviceConfiguration = ServiceConfiguration.CONF
+ .set(ServiceConfiguration.SERVICES, Counter.class)
+ .build();
+
+ eb.submitContextAndService(contextConfiguration, serviceConfiguration);
+ } catch (final BindException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ final class ContextActiveHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext activeContext) {
+ nextPass(activeContext);
+ }
+ }
+
+ final class TaskCompletedHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask completed) {
+ LOG.log(Level.INFO, "Received a completed task: " + completed);
+ final byte[] message = completed.get();
+
+ if (message.length != pass) {
+ final String msg = "Expected message of length " + pass + ", but got message of length " + message.length;
+ final RuntimeException ex = new RuntimeException(msg);
+ throw ex;
+ }
+ if (!allEqual((byte) 1, message)) {
+ final RuntimeException ex = new RuntimeException("Did not get the right message");
+ throw ex;
+ }
+
+ if (pass < PASSES) {
+ LOG.log(Level.INFO, "Submitting the next Task");
+ nextPass(completed.getActiveContext());
+ } else {
+ LOG.log(Level.INFO, "Done");
+ completed.getActiveContext().close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingTask.java
new file mode 100644
index 0000000..d7176dc
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/statepassing/StatePassingTask.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.statepassing;
+
+
+import org.apache.reef.task.Task;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+
+public class StatePassingTask implements Task {
+
+
+ private final Counter c;
+
+ @Inject
+ public StatePassingTask(final Counter c) {
+ this.c = c;
+ }
+
+
+ private static byte[] makeArray(final int size, final byte value) {
+ final byte[] result = new byte[size];
+ Arrays.fill(result, value);
+ return result;
+ }
+
+
+ @Override
+ public byte[] call(byte[] memento) throws Exception {
+ this.c.increment();
+ return makeArray(this.c.getValue(), (byte) 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
new file mode 100644
index 0000000..1cc7abd
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/yarn/failure/FailureDriver.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.yarn.failure;
+
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.poison.PoisonedConfiguration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public class FailureDriver {
+
+ private static final int NUM_EVALUATORS = 40;
+ private static final int NUM_FAILURES = 10;
+ private final AtomicInteger toSubmit = new AtomicInteger(NUM_FAILURES);
+ private static final Logger LOG = Logger.getLogger(FailureDriver.class.getName());
+ private final EvaluatorRequestor requestor;
+
+ @Inject
+ public FailureDriver(final EvaluatorRequestor requestor) {
+ this.requestor = requestor;
+ LOG.info("Driver instantiated");
+ }
+
+ /**
+ * Handles the StartTime event: Request as single Evaluator.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ LOG.log(Level.FINE, "Request {0} Evaluators.", NUM_EVALUATORS);
+ FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+ .setNumber(NUM_EVALUATORS)
+ .setMemory(64)
+ .setNumberOfCores(1)
+ .build());
+ }
+ }
+
+ /**
+ * Handles AllocatedEvaluator: Submit a poisoned context.
+ */
+ final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ final String evalId = allocatedEvaluator.getId();
+ LOG.log(Level.FINE, "Got allocated evaluator: {0}", evalId);
+ if (toSubmit.getAndDecrement() > 0) {
+ LOG.log(Level.FINE, "Submitting poisoned context. {0} to go.", toSubmit);
+ allocatedEvaluator.submitContext(
+ Tang.Factory.getTang()
+ .newConfigurationBuilder(
+ ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, "Poisoned Context: " + evalId)
+ .build(),
+ PoisonedConfiguration.CONTEXT_CONF
+ .set(PoisonedConfiguration.CRASH_PROBABILITY, "1")
+ .set(PoisonedConfiguration.CRASH_TIMEOUT, "1")
+ .build())
+ .build());
+ } else {
+ LOG.log(Level.FINE, "Closing evaluator {0}", evalId);
+ allocatedEvaluator.close();
+ }
+ }
+ }
+
+ /**
+ * Handles FailedEvaluator: Resubmits the single Evaluator resource request.
+ */
+ final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+ @Override
+ public void onNext(final FailedEvaluator failedEvaluator) {
+ LOG.log(Level.FINE, "Got failed evaluator: {0} - re-request", failedEvaluator.getId());
+ FailureDriver.this.requestor.submit(EvaluatorRequest.newBuilder()
+ .setNumber(1)
+ .setMemory(64)
+ .setNumberOfCores(1)
+ .build());
+ }
+ }
+}