You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2018/04/30 22:38:15 UTC
[1/4] reef git commit: [REEF-2012] Add driver restart capabilities to
reef runtime mock
Repository: reef
Updated Branches:
refs/heads/master 35df8205f -> 5ed56eba3
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java
deleted file mode 100644
index 0b073c8..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockUtils.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.exceptions.InjectionException;
-
-/**
- * mock utilities.
- */
-@Unstable
-@Private
-final class MockUtils {
-
- private MockUtils() {
- }
-
- public static <U, T extends Name<U>> U getValue(final Configuration configuration, final Class<T> name) {
- try {
- final Injector injector = Tang.Factory.getTang().newInjector(configuration);
- return injector.getNamedInstance(name);
- } catch (InjectionException e) {
- throw new IllegalStateException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java
deleted file mode 100644
index b5cf639..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-/**
- * mock runtime implementation.
- */
-package org.apache.reef.mock.runtime;
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java
deleted file mode 100644
index 984a9f4..0000000
--- a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/BasicMockTests.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock;
-
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.task.RunningTask;
-import org.apache.reef.mock.request.ProcessRequestInternal;
-import org.apache.reef.mock.runtime.MockAllocatedEvalautor;
-import org.apache.reef.mock.runtime.MockClock;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * basic mock tests.
- */
-final class BasicMockTests {
-
- private MockApplication mockApplication;
-
- private MockRuntime mockRuntime;
-
- private MockClock mockClock;
-
- @Before
- public void initialize() throws Exception {
- final Configuration conf = MockConfiguration.CONF
- .set(MockConfiguration.ON_DRIVER_STARTED, MockApplication.StartHandler.class)
- .set(MockConfiguration.ON_DRIVER_STOP, MockApplication.StopHandler.class)
- .set(MockConfiguration.ON_CONTEXT_ACTIVE, MockApplication.ActiveContextHandler.class)
- .set(MockConfiguration.ON_CONTEXT_CLOSED, MockApplication.ContextClosedHandler.class)
- .set(MockConfiguration.ON_CONTEXT_FAILED, MockApplication.FailedContextHandler.class)
- .set(MockConfiguration.ON_EVALUATOR_ALLOCATED, MockApplication.AllocatedEvaluatorHandler.class)
- .set(MockConfiguration.ON_EVALUATOR_COMPLETED, MockApplication.CompletedEvaluatorHandler.class)
- .set(MockConfiguration.ON_EVALUATOR_FAILED, MockApplication.FailedEvaluatorHandler.class)
- .set(MockConfiguration.ON_TASK_COMPLETED, MockApplication.CompletedTaskHandler.class)
- .set(MockConfiguration.ON_TASK_FAILED, MockApplication.FailedTaskHandler.class)
- .set(MockConfiguration.ON_TASK_RUNNING, MockApplication.RunningTaskHandler.class)
- .set(MockConfiguration.ON_TASK_SUSPENDED, MockApplication.SuspendedTaskHandler.class)
- .build();
-
- final Injector injector = Tang.Factory.getTang().newInjector(conf);
- this.mockApplication = injector.getInstance(MockApplication.class);
- this.mockRuntime = injector.getInstance(MockRuntime.class);
- this.mockClock = injector.getInstance(MockClock.class);
-
- this.mockClock.run();
- }
-
- @Test
- public void testSuccessRequests() throws Exception {
- assertTrue("mock application received start event", this.mockApplication.isRunning());
-
- this.mockApplication.requestEvaluators(1);
- assertTrue("check for process event", this.mockRuntime.hasProcessRequest());
- final ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest();
- assertEquals("allocate evalautor request", ProcessRequest.Type.ALLOCATE_EVALUATOR,
- allocateEvaluatorRequest.getType());
- final AllocatedEvaluator evaluator =
- ((ProcessRequestInternal<AllocatedEvaluator, Object>)allocateEvaluatorRequest)
- .getSuccessEvent();
- this.mockRuntime.succeed(allocateEvaluatorRequest);
- assertTrue("evaluator allocation succeeded",
- this.mockApplication.getAllocatedEvaluators().contains(evaluator));
- final ActiveContext rootContext = this.mockApplication.getContext(evaluator,
- MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId());
- assertTrue("root context", rootContext != null);
-
-
- // submit a task
- this.mockApplication.submitTask(rootContext, "test-task");
- assertTrue("create task queued", this.mockRuntime.hasProcessRequest());
- final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest();
- assertEquals("create task request", ProcessRequest.Type.CREATE_TASK,
- createTaskRequest.getType());
- final RunningTask task = (RunningTask) ((ProcessRequestInternal)createTaskRequest).getSuccessEvent();
- this.mockRuntime.succeed(createTaskRequest);
- assertTrue("task running", this.mockApplication.getRunningTasks().contains(task));
-
- // check task auto complete
- assertTrue("check for request", this.mockRuntime.hasProcessRequest());
- final ProcessRequestInternal completedTask =
- (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest();
- assertEquals("complete task request", ProcessRequest.Type.COMPLETE_TASK,
- completedTask.getType());
- this.mockRuntime.succeed(completedTask);
- assertEquals("no running tasks", 0, this.mockApplication.getRunningTasks().size());
-
- // create a sub-context
- this.mockApplication.submitContext(rootContext, "child");
- assertTrue("check for request", this.mockRuntime.hasProcessRequest());
- final ProcessRequestInternal createContextRequest =
- (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest();
- assertEquals("create context request", ProcessRequest.Type.CREATE_CONTEXT,
- createContextRequest.getType());
- this.mockRuntime.succeed(createContextRequest);
- final ActiveContext context = this.mockApplication.getContext(evaluator, "child");
- assertTrue("child context", context.getParentId().get().equals(rootContext.getId()));
- }
-
- @Test
- public void testFailureRequests() throws Exception {
- assertTrue("mock application received start event", this.mockApplication.isRunning());
-
- this.mockApplication.requestEvaluators(1);
- assertTrue("check for process event", this.mockRuntime.hasProcessRequest());
- ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest();
- this.mockRuntime.fail(allocateEvaluatorRequest);
- assertEquals("evaluator allocation failed", 1,
- this.mockApplication.getFailedEvaluators().size());
-
- this.mockApplication.requestEvaluators(1);
- allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest();
- final AllocatedEvaluator evaluator =
- (AllocatedEvaluator)((ProcessRequestInternal)allocateEvaluatorRequest).getSuccessEvent();
- this.mockRuntime.succeed(allocateEvaluatorRequest);
- final ActiveContext rootContext = this.mockApplication
- .getContext(evaluator, MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId());
-
-
- // submit a task
- this.mockApplication.submitTask(rootContext, "test-task");
- assertTrue("create task queued", this.mockRuntime.hasProcessRequest());
- final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest();
- assertEquals("create task request", ProcessRequest.Type.CREATE_TASK,
- createTaskRequest.getType());
- this.mockRuntime.fail(createTaskRequest);
- assertEquals("task running", 1, this.mockApplication.getFailedTasks().size());
-
- // create a sub-context
- this.mockApplication.submitContext(rootContext, "child");
- assertTrue("check for request", this.mockRuntime.hasProcessRequest());
- final ProcessRequestInternal createContextRequest =
- (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest();
- this.mockRuntime.fail(createContextRequest);
- assertEquals("child context", 1, this.mockApplication.getFailedContext().size());
- }
-
- @Test
- public void testMockFailures() {
- // make sure we're running
- assertTrue("mock application received start event", this.mockApplication.isRunning());
-
- // allocate an evaluator and get root context
- this.mockApplication.requestEvaluators(1);
- this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest());
- final AllocatedEvaluator evaluator = this.mockRuntime.getCurrentAllocatedEvaluators().iterator().next();
- final ActiveContext rootContext = this.mockApplication.getContext(evaluator,
- MockAllocatedEvalautor.ROOT_CONTEXT_IDENTIFIER_PREFIX + evaluator.getId());
-
- // create a child context off of root context
- this.mockApplication.submitContext(rootContext, "child");
- this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest());
- final ActiveContext childContext = this.mockApplication.getContext(evaluator, "child");
-
- // submit a task from child context
- this.mockApplication.submitTask(childContext, "test-task");
- final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest();
- createTaskRequest.setAutoComplete(false); // keep it running
- this.mockRuntime.succeed(createTaskRequest);
- final RunningTask task = this.mockRuntime.getCurrentRunningTasks().iterator().next();
-
- // fail task
- this.mockRuntime.fail(task);
- assertEquals("task failed", 1, this.mockApplication.getFailedTasks().size());
-
- // fail child context
- this.mockRuntime.fail(childContext);
- assertTrue("child context failed",
- this.mockApplication.getFailedContext().iterator().next().getId().equals(childContext.getId()));
- // evaluator should still be up
- assertEquals("check evaluator", 0, this.mockApplication.getFailedEvaluators().size());
-
- // fail evaluator
- this.mockRuntime.fail(evaluator);
- assertEquals("evaluator failed", 1, this.mockApplication.getFailedEvaluators().size());
-
- // both contexts should be failed
- assertEquals("root and child contexts failed", 2,
- this.mockApplication.getFailedContext().size());
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java
deleted file mode 100644
index 86a105e..0000000
--- a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/MockApplication.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock;
-
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.ContextConfiguration;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.CompletedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.driver.task.*;
-import org.apache.reef.tang.annotations.Unit;
-import org.apache.reef.task.Task;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.Clock;
-import org.apache.reef.wake.time.event.Alarm;
-import org.apache.reef.wake.time.event.StartTime;
-import org.apache.reef.wake.time.event.StopTime;
-
-import javax.inject.Inject;
-import java.util.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * mock application.
- */
-@Unit
-final class MockApplication {
-
- private static final Logger LOG = Logger.getLogger(MockApplication.class.getName());
-
- private final Clock clock;
-
- private final EvaluatorRequestor evaluatorRequestor;
-
- private final Map<String, Map<String, ActiveContext>> evaluatorId2ContextId2ContextMap = new HashMap<>();
-
- private final Map<String, AllocatedEvaluator> evaluatorMap = new HashMap<>();
-
- private final Map<String, FailedEvaluator> failedEvaluatorMap = new HashMap<>();
-
- private final Map<String, RunningTask> evaluatorIdRunningTaskMap = new HashMap<>();
-
- private final Set<FailedContext> failedContextSet = new HashSet<>();
-
- private final Set<FailedTask> failedTaskSet = new HashSet<>();
-
- private final Set<SuspendedTask> suspendedTaskSet = new HashSet<>();
-
- private boolean running = false;
-
- @Inject
- MockApplication(final Clock clock, final EvaluatorRequestor evaluatorRequestor) {
- this.clock = clock;
- this.evaluatorRequestor = evaluatorRequestor;
- }
-
- ActiveContext getContext(final AllocatedEvaluator evaluator, final String identifier) {
- return this.evaluatorId2ContextId2ContextMap.get(evaluator.getId()).get(identifier);
- }
-
- Collection<RunningTask> getRunningTasks() {
- return Collections.unmodifiableCollection(this.evaluatorIdRunningTaskMap.values());
- }
-
- Collection<AllocatedEvaluator> getAllocatedEvaluators() {
- return Collections.unmodifiableCollection(this.evaluatorMap.values());
- }
-
- Collection<FailedEvaluator> getFailedEvaluators() {
- return Collections.unmodifiableCollection(this.failedEvaluatorMap.values());
- }
-
- Collection<FailedTask> getFailedTasks() {
- return Collections.unmodifiableCollection(this.failedTaskSet);
- }
-
- Collection<FailedContext> getFailedContext() {
- return Collections.unmodifiableCollection(this.failedContextSet);
- }
-
- void requestEvaluators(final int numEvaluators) {
- LOG.log(Level.INFO, "request {0} Evaluators", numEvaluators);
- evaluatorRequestor.newRequest()
- .setMemory(128)
- .setNumberOfCores(1)
- .setNumber(numEvaluators)
- .submit();
- }
-
- void submitTask(final ActiveContext context, final String identifier) {
- context.submitTask(TaskConfiguration.CONF
- .set(TaskConfiguration.IDENTIFIER, identifier)
- .set(TaskConfiguration.TASK, DummyTestTask.class)
- .build());
- }
-
- void submitContext(final ActiveContext context, final String identifier) {
- context.submitContext(ContextConfiguration.CONF
- .set(ContextConfiguration.IDENTIFIER, identifier)
- .build());
- }
-
- boolean isRunning() {
- return this.running;
- }
-
- boolean exists(final AllocatedEvaluator evaluator) {
- return this.evaluatorMap.containsKey(evaluator.getId());
- }
-
- /**
- * Job Driver is ready and the clock is set up: request the evaluatorMap.
- */
- final class StartHandler implements EventHandler<StartTime> {
- @Override
- public void onNext(final StartTime startTime) {
- clock.scheduleAlarm(Integer.MAX_VALUE, new EventHandler<Alarm>() {
- @Override
- public void onNext(final Alarm value) {
- throw new RuntimeException("should not happen");
- }
- });
- running = true;
- }
- }
-
- /**
- * Job Driver is is shutting down: write to the log.
- */
- final class StopHandler implements EventHandler<StopTime> {
- @Override
- public void onNext(final StopTime stopTime) {
- running = false;
- }
- }
-
- /**
- * Receive notification that an Evaluator had been allocated,
- * and submitTask a new Task in that Evaluator.
- */
- final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
- @Override
- public void onNext(final AllocatedEvaluator eval) {
- evaluatorMap.put(eval.getId(), eval);
- }
- }
-
- /**
- * Receive notification that the Evaluator has been shut down.
- */
- final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
- @Override
- public void onNext(final CompletedEvaluator eval) {
- evaluatorMap.remove(eval.getId());
- evaluatorId2ContextId2ContextMap.remove(eval.getId());
- evaluatorIdRunningTaskMap.remove(eval.getId());
- }
- }
-
- final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
-
- @Override
- public void onNext(final FailedEvaluator eval) {
- evaluatorMap.remove(eval.getId());
- evaluatorId2ContextId2ContextMap.remove(eval.getId());
- evaluatorIdRunningTaskMap.remove(eval.getId());
- failedEvaluatorMap.put(eval.getId(), eval);
- failedContextSet.addAll(eval.getFailedContextList());
- }
- }
-
- /**
- * Receive notification that the Context is active.
- */
- final class ActiveContextHandler implements EventHandler<ActiveContext> {
- @Override
- public void onNext(final ActiveContext context) {
- if (!evaluatorId2ContextId2ContextMap.containsKey(context.getEvaluatorId())) {
- evaluatorId2ContextId2ContextMap.put(context.getEvaluatorId(), new HashMap<String, ActiveContext>());
- }
- if (evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).containsKey(context.getId())) {
- throw new IllegalStateException(
- String.format("Context %s on evaluator %s already exists on evaluator with " +
- "same identifier", context.getId(), context.getEvaluatorId()));
- }
- evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).put(context.getId(), context);
- }
- }
-
- final class ContextClosedHandler implements EventHandler<ClosedContext> {
- @Override
- public void onNext(final ClosedContext value) {
- assert evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId());
- assert evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId());
- evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getId());
- }
- }
-
- final class FailedContextHandler implements EventHandler<FailedContext> {
- @Override
- public void onNext(final FailedContext value) {
- if (evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId()) &&
- evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId())) {
- evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getEvaluatorId());
- } else {
- // must have failed before it succeeded
- }
- failedContextSet.add(value);
- }
- }
-
- /**
- * Receive notification that the Task is running.
- */
- final class RunningTaskHandler implements EventHandler<RunningTask> {
- @Override
- public void onNext(final RunningTask task) {
- evaluatorIdRunningTaskMap.put(task.getActiveContext().getEvaluatorId(), task);
- }
- }
-
- /**
- * Receive notification that the Task has completed successfully.
- */
- final class CompletedTaskHandler implements EventHandler<CompletedTask> {
- @Override
- public void onNext(final CompletedTask task) {
- evaluatorIdRunningTaskMap.remove(task.getActiveContext().getEvaluatorId());
- }
- }
-
- final class FailedTaskHandler implements EventHandler<FailedTask> {
- @Override
- public void onNext(final FailedTask value) {
- evaluatorIdRunningTaskMap.remove(value.getActiveContext().get().getEvaluatorId());
- failedTaskSet.add(value);
- }
- }
-
- final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
- @Override
- public void onNext(final SuspendedTask value) {
- evaluatorIdRunningTaskMap.remove(value.getActiveContext().getEvaluatorId());
- suspendedTaskSet.add(value);
- }
- }
-
- private static final class DummyTestTask implements Task {
- @Override
- public byte[] call(final byte[] memento) throws Exception {
- return new byte[0];
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/BasicMockTests.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/BasicMockTests.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/BasicMockTests.java
new file mode 100644
index 0000000..8b52295
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/BasicMockTests.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.mock.driver.request.ProcessRequestInternal;
+import org.apache.reef.mock.driver.runtime.MockAllocatedEvaluator;
+import org.apache.reef.mock.driver.runtime.MockClock;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * basic mock tests.
+ */
+final class BasicMockTests {
+
+ private MockApplication mockApplication;
+
+ private MockRuntime mockRuntime;
+
+ private MockClock mockClock;
+
+ @Before
+ public void initialize() throws Exception {
+ final Configuration conf = MockConfiguration.CONF
+ .set(MockConfiguration.ON_DRIVER_STARTED, MockApplication.StartHandler.class)
+ .set(MockConfiguration.ON_DRIVER_STOP, MockApplication.StopHandler.class)
+ .set(MockConfiguration.ON_CONTEXT_ACTIVE, MockApplication.ActiveContextHandler.class)
+ .set(MockConfiguration.ON_CONTEXT_CLOSED, MockApplication.ContextClosedHandler.class)
+ .set(MockConfiguration.ON_CONTEXT_FAILED, MockApplication.FailedContextHandler.class)
+ .set(MockConfiguration.ON_EVALUATOR_ALLOCATED, MockApplication.AllocatedEvaluatorHandler.class)
+ .set(MockConfiguration.ON_EVALUATOR_COMPLETED, MockApplication.CompletedEvaluatorHandler.class)
+ .set(MockConfiguration.ON_EVALUATOR_FAILED, MockApplication.FailedEvaluatorHandler.class)
+ .set(MockConfiguration.ON_TASK_COMPLETED, MockApplication.CompletedTaskHandler.class)
+ .set(MockConfiguration.ON_TASK_FAILED, MockApplication.FailedTaskHandler.class)
+ .set(MockConfiguration.ON_TASK_RUNNING, MockApplication.RunningTaskHandler.class)
+ .set(MockConfiguration.ON_TASK_SUSPENDED, MockApplication.SuspendedTaskHandler.class)
+ .build();
+
+ final Injector injector = Tang.Factory.getTang().newInjector(conf);
+ this.mockApplication = injector.getInstance(MockApplication.class);
+ this.mockRuntime = injector.getInstance(MockRuntime.class);
+ this.mockClock = injector.getInstance(MockClock.class);
+
+ this.mockClock.run();
+ }
+
+ @Test
+ public void testSuccessRequests() throws Exception {
+ assertTrue("mock application received start event", this.mockApplication.isRunning());
+
+ this.mockApplication.requestEvaluators(1);
+ assertTrue("check for process event", this.mockRuntime.hasProcessRequest());
+ final ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest();
+ assertEquals("allocate evaluator request", ProcessRequest.Type.ALLOCATE_EVALUATOR,
+ allocateEvaluatorRequest.getType());
+ final AllocatedEvaluator evaluator =
+ ((ProcessRequestInternal<AllocatedEvaluator, Object>)allocateEvaluatorRequest)
+ .getSuccessEvent();
+ this.mockRuntime.succeed(allocateEvaluatorRequest);
+ assertTrue("evaluator allocation succeeded",
+ this.mockApplication.getAllocatedEvaluators().contains(evaluator));
+ final String contextId = "foo";
+ this.mockApplication.submitContext(evaluator, contextId);
+ final ActiveContext rootContext = ((MockAllocatedEvaluator) evaluator).getRootContext();
+ assertTrue("root context", rootContext != null);
+
+
+ // submit a task
+ this.mockApplication.submitTask(rootContext, "test-task");
+ assertTrue("create task queued", this.mockRuntime.hasProcessRequest());
+ final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest();
+ assertEquals("create task request", ProcessRequest.Type.CREATE_TASK,
+ createTaskRequest.getType());
+ final RunningTask task = (RunningTask) ((ProcessRequestInternal)createTaskRequest).getSuccessEvent();
+ this.mockRuntime.succeed(createTaskRequest);
+ assertTrue("task running", this.mockApplication.getRunningTasks().contains(task));
+
+ // check task auto complete
+ assertTrue("check for request", this.mockRuntime.hasProcessRequest());
+ final ProcessRequestInternal completedTask =
+ (ProcessRequestInternal) this.mockRuntime.getNextProcessRequest();
+ assertEquals("complete task request", ProcessRequest.Type.COMPLETE_TASK,
+ completedTask.getType());
+ this.mockRuntime.succeed(completedTask);
+ assertEquals("no running tasks", 0, this.mockApplication.getRunningTasks().size());
+ }
+
+ @Test
+ public void testFailureRequests() throws Exception {
+ assertTrue("mock application received start event", this.mockApplication.isRunning());
+
+ this.mockApplication.requestEvaluators(1);
+ assertTrue("check for process event", this.mockRuntime.hasProcessRequest());
+ ProcessRequest allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest();
+ this.mockRuntime.fail(allocateEvaluatorRequest);
+ assertEquals("evaluator allocation failed", 1,
+ this.mockApplication.getFailedEvaluators().size());
+
+ this.mockApplication.requestEvaluators(1);
+ allocateEvaluatorRequest = this.mockRuntime.getNextProcessRequest();
+ final AllocatedEvaluator evaluator =
+ (AllocatedEvaluator)((ProcessRequestInternal)allocateEvaluatorRequest).getSuccessEvent();
+ this.mockRuntime.succeed(allocateEvaluatorRequest);
+ this.mockApplication.submitContext(evaluator, "FOO");
+ final ActiveContext rootContext = this.mockApplication
+ .getContext(evaluator, "FOO");
+
+
+ // submit a task
+ this.mockApplication.submitTask(rootContext, "test-task");
+ assertTrue("create task queued", this.mockRuntime.hasProcessRequest());
+ final ProcessRequest createTaskRequest = this.mockRuntime.getNextProcessRequest();
+ assertEquals("create task request", ProcessRequest.Type.CREATE_TASK,
+ createTaskRequest.getType());
+ this.mockRuntime.fail(createTaskRequest);
+ assertEquals("task running", 1, this.mockApplication.getFailedTasks().size());
+ }
+
+ @Test
+ public void testMockFailures() {
+ // make sure we're running
+ assertTrue("mock application received start event", this.mockApplication.isRunning());
+
+ // allocate an evaluator and get root context
+ this.mockApplication.requestEvaluators(1);
+ this.mockRuntime.succeed(this.mockRuntime.getNextProcessRequest());
+ final AllocatedEvaluator evaluator = this.mockRuntime.getCurrentAllocatedEvaluators().iterator().next();
+ this.mockApplication.submitContext(evaluator, "FOO");
+ // fail evaluator
+ this.mockRuntime.fail(evaluator);
+ assertEquals("evaluator failed", 1, this.mockApplication.getFailedEvaluators().size());
+
+ // both contexts should be failed
+ assertEquals("root and child contexts failed", 2,
+ this.mockApplication.getFailedContext().size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/MockApplication.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/MockApplication.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/MockApplication.java
new file mode 100644
index 0000000..3ce66b2
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/MockApplication.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.Task;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * mock application.
+ */
+@Unit
+final class MockApplication {
+
+ private static final Logger LOG = Logger.getLogger(MockApplication.class.getName());
+
+ private final Clock clock;
+
+ private final EvaluatorRequestor evaluatorRequestor;
+
+ private final Map<String, Map<String, ActiveContext>> evaluatorId2ContextId2ContextMap = new HashMap<>();
+
+ private final Map<String, AllocatedEvaluator> evaluatorMap = new HashMap<>();
+
+ private final Map<String, FailedEvaluator> failedEvaluatorMap = new HashMap<>();
+
+ private final Map<String, RunningTask> evaluatorIdRunningTaskMap = new HashMap<>();
+
+ private final Set<FailedContext> failedContextSet = new HashSet<>();
+
+ private final Set<FailedTask> failedTaskSet = new HashSet<>();
+
+ private final Set<SuspendedTask> suspendedTaskSet = new HashSet<>();
+
+ private boolean running = false;
+
+ @Inject
+ MockApplication(final Clock clock, final EvaluatorRequestor evaluatorRequestor) {
+ this.clock = clock;
+ this.evaluatorRequestor = evaluatorRequestor;
+ }
+
+ ActiveContext getContext(final AllocatedEvaluator evaluator, final String identifier) {
+ return this.evaluatorId2ContextId2ContextMap.get(evaluator.getId()).get(identifier);
+ }
+
+ Collection<RunningTask> getRunningTasks() {
+ return Collections.unmodifiableCollection(this.evaluatorIdRunningTaskMap.values());
+ }
+
+ Collection<AllocatedEvaluator> getAllocatedEvaluators() {
+ return Collections.unmodifiableCollection(this.evaluatorMap.values());
+ }
+
+ Collection<FailedEvaluator> getFailedEvaluators() {
+ return Collections.unmodifiableCollection(this.failedEvaluatorMap.values());
+ }
+
+ Collection<FailedTask> getFailedTasks() {
+ return Collections.unmodifiableCollection(this.failedTaskSet);
+ }
+
+ Collection<FailedContext> getFailedContext() {
+ return Collections.unmodifiableCollection(this.failedContextSet);
+ }
+
+ void requestEvaluators(final int numEvaluators) {
+ LOG.log(Level.INFO, "request {0} Evaluators", numEvaluators);
+ evaluatorRequestor.newRequest()
+ .setMemory(128)
+ .setNumberOfCores(1)
+ .setNumber(numEvaluators)
+ .submit();
+ }
+
+ void submitTask(final ActiveContext context, final String identifier) {
+ context.submitTask(TaskConfiguration.CONF
+ .set(TaskConfiguration.IDENTIFIER, identifier)
+ .set(TaskConfiguration.TASK, DummyTestTask.class)
+ .build());
+ }
+
+ void submitContext(final AllocatedEvaluator evaluator, final String identifier) {
+ evaluator.submitContext(ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, identifier)
+ .build());
+ }
+
+ boolean isRunning() {
+ return this.running;
+ }
+
+ boolean exists(final AllocatedEvaluator evaluator) {
+ return this.evaluatorMap.containsKey(evaluator.getId());
+ }
+
+ /**
+ * Job Driver is ready and the clock is set up: request the evaluatorMap.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ clock.scheduleAlarm(Integer.MAX_VALUE, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm value) {
+ throw new RuntimeException("should not happen");
+ }
+ });
+ running = true;
+ }
+ }
+
+ /**
+ * Job Driver is is shutting down: write to the log.
+ */
+ final class StopHandler implements EventHandler<StopTime> {
+ @Override
+ public void onNext(final StopTime stopTime) {
+ running = false;
+ }
+ }
+
+ /**
+ * Receive notification that an Evaluator had been allocated,
+ * and submitTask a new Task in that Evaluator.
+ */
+ final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator eval) {
+ evaluatorMap.put(eval.getId(), eval);
+ }
+ }
+
+ /**
+ * Receive notification that the Evaluator has been shut down.
+ */
+ final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
+ @Override
+ public void onNext(final CompletedEvaluator eval) {
+ evaluatorMap.remove(eval.getId());
+ evaluatorId2ContextId2ContextMap.remove(eval.getId());
+ evaluatorIdRunningTaskMap.remove(eval.getId());
+ }
+ }
+
+ final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+
+ @Override
+ public void onNext(final FailedEvaluator eval) {
+ evaluatorMap.remove(eval.getId());
+ evaluatorId2ContextId2ContextMap.remove(eval.getId());
+ evaluatorIdRunningTaskMap.remove(eval.getId());
+ failedEvaluatorMap.put(eval.getId(), eval);
+ failedContextSet.addAll(eval.getFailedContextList());
+ }
+ }
+
+ /**
+ * Receive notification that the Context is active.
+ */
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext context) {
+ if (!evaluatorId2ContextId2ContextMap.containsKey(context.getEvaluatorId())) {
+ evaluatorId2ContextId2ContextMap.put(context.getEvaluatorId(), new HashMap<String, ActiveContext>());
+ }
+ if (evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).containsKey(context.getId())) {
+ throw new IllegalStateException(
+ String.format("Context %s on evaluator %s already exists on evaluator with " +
+ "same identifier", context.getId(), context.getEvaluatorId()));
+ }
+ evaluatorId2ContextId2ContextMap.get(context.getEvaluatorId()).put(context.getId(), context);
+ }
+ }
+
+ final class ContextClosedHandler implements EventHandler<ClosedContext> {
+ @Override
+ public void onNext(final ClosedContext value) {
+ assert evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId());
+ assert evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId());
+ evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getId());
+ }
+ }
+
+ final class FailedContextHandler implements EventHandler<FailedContext> {
+ @Override
+ public void onNext(final FailedContext value) {
+ if (evaluatorId2ContextId2ContextMap.containsKey(value.getEvaluatorId()) &&
+ evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).containsKey(value.getId())) {
+ evaluatorId2ContextId2ContextMap.get(value.getEvaluatorId()).remove(value.getEvaluatorId());
+ } else {
+ // must have failed before it succeeded
+ }
+ failedContextSet.add(value);
+ }
+ }
+
+ /**
+ * Receive notification that the Task is running.
+ */
+ final class RunningTaskHandler implements EventHandler<RunningTask> {
+ @Override
+ public void onNext(final RunningTask task) {
+ evaluatorIdRunningTaskMap.put(task.getActiveContext().getEvaluatorId(), task);
+ }
+ }
+
+ /**
+ * Receive notification that the Task has completed successfully.
+ */
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask task) {
+ evaluatorIdRunningTaskMap.remove(task.getActiveContext().getEvaluatorId());
+ }
+ }
+
+ final class FailedTaskHandler implements EventHandler<FailedTask> {
+ @Override
+ public void onNext(final FailedTask value) {
+ evaluatorIdRunningTaskMap.remove(value.getActiveContext().get().getEvaluatorId());
+ failedTaskSet.add(value);
+ }
+ }
+
+ final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
+ @Override
+ public void onNext(final SuspendedTask value) {
+ evaluatorIdRunningTaskMap.remove(value.getActiveContext().getEvaluatorId());
+ suspendedTaskSet.add(value);
+ }
+ }
+
+ private static final class DummyTestTask implements Task {
+ @Override
+ public byte[] call(final byte[] memento) throws Exception {
+ return new byte[0];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/package-info.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/package-info.java
new file mode 100644
index 0000000..75f80b6
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/driver/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/**
+ * mock runtime tests.
+ */
+package org.apache.reef.mock.driver;
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java b/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java
deleted file mode 100644
index e93688c..0000000
--- a/lang/java/reef-runtime-mock/src/test/java/org/apache/reef/mock/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-/**
- * mock runtime tests.
- */
-package org.apache.reef.mock;
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
index dc175e2..0f88c46 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRestartConfiguration.java
@@ -40,7 +40,7 @@ import org.apache.reef.tang.formats.OptionalImpl;
@Unstable
public final class YarnDriverRestartConfiguration extends ConfigurationModuleBuilder {
/**
- * The Evaluator Preserver implementation used for YARN. Defaults to DFSEvalutorPreserver.
+ * The Evaluator Preserver implementation used for YARN. Defaults to DFSEvaluatorPreserver.
*/
public static final OptionalImpl<EvaluatorPreserver> EVALUATOR_PRESERVER = new OptionalImpl<>();
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
index bdf1779..9ba8d28 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -156,7 +156,7 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta
/**
* Used by {@link org.apache.reef.driver.restart.DriverRestartManager}.
* Gets the list of previous containers from the resource manager,
- * compares that list to the YarnDriverRuntimeRestartManager's own list based on the evalutor preserver,
+ * compares that list to the YarnDriverRuntimeRestartManager's own list based on the evaluator preserver,
* and determine which evaluators are alive and which have failed during restart.
* @return a map of Evaluator ID to {@link EvaluatorRestartInfo} for evaluators that have either failed or survived
* driver restart.
[3/4] reef git commit: [REEF-2012] Add driver restart capabilities to
reef runtime mock
Posted by mo...@apache.org.
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java
new file mode 100644
index 0000000..d90486a
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToContext.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.mock.driver.ProcessRequest;
+
+/**
+ * send message from driver to context process request.
+ */
+@Unstable
+@Private
+public final class SendMessageDriverToContext implements
+ ProcessRequestInternal<Object, Object> {
+
+ private final ActiveContext context;
+
+ private final byte[] message;
+
+ public SendMessageDriverToContext(final ActiveContext context, final byte[] message) {
+ this.context = context;
+ this.message = message;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.SEND_MESSAGE_DRIVER_TO_CONTEXT;
+ }
+
+ public ActiveContext getContext() {
+ return this.context;
+ }
+
+ public byte[] getMessage() {
+ return this.message;
+ }
+
+ @Override
+ public Object getSuccessEvent() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object getFailureEvent() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return false;
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java
new file mode 100644
index 0000000..bf6c3dd
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SendMessageDriverToTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.mock.driver.ProcessRequest;
+
+/**
+ * send message from driver to task process request.
+ */
+@Unstable
+@Private
+public final class SendMessageDriverToTask implements
+ ProcessRequestInternal<Object, Object> {
+
+ private RunningTask task;
+
+ private final byte[] message;
+
+ public SendMessageDriverToTask(final RunningTask task, final byte[] message) {
+ this.task = task;
+ this.message = message;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.SEND_MESSAGE_DRIVER_TO_TASK;
+ }
+
+ public RunningTask getTask() {
+ return task;
+ }
+
+ public byte[] getMessage() {
+ return message;
+ }
+
+ @Override
+ public Object getSuccessEvent() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object getFailureEvent() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return false;
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java
new file mode 100644
index 0000000..a4dbcee
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/SuspendTask.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.driver.task.SuspendedTask;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockRunningTask;
+import org.apache.reef.mock.driver.runtime.MockSuspendedTask;
+import org.apache.reef.util.Optional;
+
+/**
+ * suspend task process request.
+ */
+@Unstable
+@Private
+public final class SuspendTask implements ProcessRequestInternal<SuspendedTask, FailedTask> {
+
+ private final MockRunningTask task;
+
+ private final Optional<byte[]> message;
+
+ public SuspendTask(final MockRunningTask task, final Optional<byte[]> message) {
+ this.task = task;
+ this.message = message;
+ }
+
+ public MockRunningTask getTask() {
+ return task;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.SUSPEND_TASK;
+ }
+
+ public Optional<byte[]> getMessage() {
+ return message;
+ }
+
+ @Override
+ public MockSuspendedTask getSuccessEvent() {
+ return new MockSuspendedTask(this.task);
+ }
+
+ @Override
+ public FailedTask getFailureEvent() {
+ return new FailedTask(
+ this.task.getId(),
+ "mock",
+ Optional.<String>empty(),
+ Optional.<Throwable>empty(),
+ Optional.<byte[]>empty(),
+ Optional.of(this.task.getActiveContext()));
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return false;
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java
new file mode 100644
index 0000000..22dbfbc
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/**
+ * process request implementations.
+ */
+package org.apache.reef.mock.driver.request;
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java
new file mode 100644
index 0000000..292dd9d
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockActiveContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.mock.driver.request.CloseContext;
+import org.apache.reef.mock.driver.request.CreateContext;
+import org.apache.reef.mock.driver.request.CreateTask;
+import org.apache.reef.mock.driver.request.SendMessageDriverToContext;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock active context.
+ */
+@Unstable
+@Private
+public final class MockActiveContext implements ActiveContext {
+
+ private final MockRuntimeDriver mockRuntimeDriver;
+
+ private final MockAllocatedEvaluator evaluator;
+
+ private final Optional<MockActiveContext> parentContext;
+
+ private final String contextID;
+
+ MockActiveContext(
+ final MockRuntimeDriver mockRuntimeDriver,
+ final MockAllocatedEvaluator evaluator,
+ final Optional<MockActiveContext> parentContext,
+ final String contextID) {
+ this.mockRuntimeDriver = mockRuntimeDriver;
+ this.evaluator = evaluator;
+ this.parentContext = parentContext;
+ this.contextID = contextID;
+ }
+
+ @Override
+ public int hashCode() {
+ final String id = this.getEvaluatorId() + ":" + contextID;
+ return id.hashCode();
+ }
+
+ public boolean equals(final Object that) {
+ if (that instanceof MockActiveContext) {
+ return this.getEvaluatorId().equals(((MockActiveContext)that).getEvaluatorId()) &&
+ this.contextID.equals(((MockActiveContext)that).contextID);
+ }
+ return false;
+ }
+
+ public MockAllocatedEvaluator getEvaluator() {
+ return this.evaluator;
+ }
+
+ public Optional<MockActiveContext> getParentContext() {
+ return this.parentContext;
+ }
+
+ @Override
+ public void close() {
+ this.mockRuntimeDriver.add(new CloseContext(this));
+ }
+
+ @Override
+ public void submitTask(final Configuration taskConf) {
+ final String taskID = MockUtils.getValue(taskConf, TaskConfigurationOptions.Identifier.class);
+ final MockRunningTask task = new MockRunningTask(this.mockRuntimeDriver, taskID, this);
+ this.mockRuntimeDriver.add(new CreateTask(task, this.mockRuntimeDriver.getTaskReturnValueProvider()));
+ }
+
+ @Override
+ public void submitContext(final Configuration contextConfiguration) {
+ final String childContextID = MockUtils.getValue(contextConfiguration, ContextIdentifier.class);
+ final MockActiveContext context = new MockActiveContext(
+ this.mockRuntimeDriver,
+ this.evaluator,
+ Optional.of(this),
+ childContextID);
+ this.mockRuntimeDriver.add(new CreateContext(context));
+ }
+
+ @Override
+ public void submitContextAndService(
+ final Configuration contextConfiguration,
+ final Configuration serviceConfiguration) {
+ submitContext(contextConfiguration);
+ }
+
+ @Override
+ public void sendMessage(final byte[] message) {
+ this.mockRuntimeDriver.add(new SendMessageDriverToContext(this, message));
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.evaluator.getId();
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return this.parentContext.isPresent() ?
+ Optional.of(this.parentContext.get().getId()) :
+ Optional.<String>empty();
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluator.getEvaluatorDescriptor();
+ }
+
+ @Override
+ public String getId() {
+ return this.contextID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java
new file mode 100644
index 0000000..d3c0581
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockAllocatedEvaluator.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorProcess;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.mock.driver.request.CloseEvaluator;
+import org.apache.reef.mock.driver.request.CreateContext;
+import org.apache.reef.mock.driver.request.CreateContextAndTask;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.util.Optional;
+
+import java.io.File;
+
+/**
+ * mock allocated evaluator.
+ */
+@Unstable
+@Private
+public final class MockAllocatedEvaluator implements AllocatedEvaluator {
+ public static final String ROOT_CONTEXT_IDENTIFIER_PREFIX = "ROOT.CONTEXT.";
+
+ private final MockRuntimeDriver mockRuntimeDriver;
+
+ private final String identifier;
+
+ private final EvaluatorDescriptor evaluatorDescriptor;
+
+ private MockActiveContext rootContext = null;
+
+ private boolean closed = false;
+
+ MockAllocatedEvaluator(
+ final MockRuntimeDriver mockRuntimeDriver,
+ final String identifier,
+ final EvaluatorDescriptor evaluatorDescriptor) {
+ this.mockRuntimeDriver = mockRuntimeDriver;
+ this.identifier = identifier;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ }
+
+ public MockActiveContext getRootContext() {
+ return this.rootContext;
+ }
+
+ @Override
+ public void addFile(final File file) {
+ // ignore
+ }
+
+ @Override
+ public void addLibrary(final File file) {
+ // ignore
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+
+ @Override
+ public void setProcess(final EvaluatorProcess process) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ if (!this.closed) {
+ this.mockRuntimeDriver.add(new CloseEvaluator(this));
+ } else {
+ throw new IllegalStateException("evaluator already closed");
+ }
+ }
+
+ @Override
+ public void submitTask(final Configuration taskConfiguration) {
+ if (this.rootContext != null) {
+ throw new IllegalStateException("Root context already created");
+ }
+ this.rootContext = new MockActiveContext(
+ mockRuntimeDriver,
+ this,
+ Optional.<MockActiveContext>empty(),
+ ROOT_CONTEXT_IDENTIFIER_PREFIX + identifier);
+ final String taskID = MockUtils.getValue(taskConfiguration, TaskConfigurationOptions.Identifier.class);
+ final MockRunningTask mockTask = new MockRunningTask(this.mockRuntimeDriver, taskID, this.rootContext);
+ this.mockRuntimeDriver.add(
+ new CreateContextAndTask(
+ this.rootContext,
+ mockTask,
+ this.mockRuntimeDriver.getTaskReturnValueProvider()));
+ }
+
+ @Override
+ public void submitContext(final Configuration contextConfiguration) {
+ if (this.rootContext != null) {
+ throw new IllegalStateException("Root context already created");
+ }
+ final String rootContextID = MockUtils.getValue(contextConfiguration, ContextIdentifier.class);
+ this.rootContext = new MockActiveContext(
+ this.mockRuntimeDriver,
+ this,
+ Optional.<MockActiveContext>empty(),
+ rootContextID);
+ this.mockRuntimeDriver.add(new CreateContext(this.rootContext));
+ }
+
+ @Override
+ public void submitContextAndService(
+ final Configuration contextConfiguration,
+ final Configuration serviceConfiguration) {
+ submitContext(contextConfiguration);
+ // ignore services
+ }
+
+ @Override
+ public void submitContextAndTask(
+ final Configuration contextConfiguration,
+ final Configuration taskConfiguration) {
+ if (this.rootContext != null) {
+ throw new IllegalStateException("Root context already created");
+ }
+ final String contextID = MockUtils.getValue(contextConfiguration, ContextIdentifier.class);
+ final String taskID = MockUtils.getValue(taskConfiguration, TaskConfigurationOptions.Identifier.class);
+ this.rootContext = new MockActiveContext(
+ this.mockRuntimeDriver,
+ this,
+ Optional.<MockActiveContext>empty(),
+ contextID);
+ final MockRunningTask mockTask = new MockRunningTask(this.mockRuntimeDriver, taskID, this.rootContext);
+ this.mockRuntimeDriver.add(
+ new CreateContextAndTask(
+ this.rootContext,
+ mockTask,
+ this.mockRuntimeDriver.getTaskReturnValueProvider()));
+ }
+
+ @Override
+ public void submitContextAndServiceAndTask(
+ final Configuration contextConfiguration,
+ final Configuration serviceConfiguration,
+ final Configuration taskConfiguration) {
+ submitContextAndTask(contextConfiguration, taskConfiguration);
+ }
+
+ @Override
+ public String getId() {
+ return this.identifier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java
new file mode 100644
index 0000000..297647a
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClock.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.mock.driver.MockRuntime;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.Time;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.runtime.event.ClientAlarm;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * The MockClock can be used to drive alarms set by the client application.
+ */
+@Unstable
+@Private
+public final class MockClock implements Clock {
+
+ private final InjectionFuture<MockRuntime> runtime;
+
+ private final List<Alarm> alarmList = new ArrayList<>();
+
+ private long currentTime = 0;
+
+ private boolean closed = false;
+
+ @Inject
+ MockClock(final InjectionFuture<MockRuntime> runtime) {
+ this.runtime = runtime;
+ }
+
+ /**
+ * Advances the clock by the offset amount.
+ * @param offset amount to advance clock
+ */
+ public void advanceClock(final int offset) {
+ this.currentTime += offset;
+ final Iterator<Alarm> iter = this.alarmList.iterator();
+ while (iter.hasNext()) {
+ final Alarm alarm = iter.next();
+ if (alarm.getTimestamp() <= this.currentTime) {
+ alarm.run();
+ iter.remove();
+ }
+ }
+ }
+
+ /**
+ * @return the current mock clock time
+ */
+ public long getCurrentTime() {
+ return this.currentTime;
+ }
+
+ @Override
+ public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
+ final Alarm alarm = new ClientAlarm(this.currentTime + offset, handler);
+ alarmList.add(alarm);
+ return alarm;
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ this.runtime.get().stop();
+ this.closed = true;
+ }
+ }
+
+ @Override
+ public void stop() {
+ close();
+ }
+
+ @Override
+ public void stop(final Throwable exception) {
+ close();
+ }
+
+ @Override
+ public boolean isIdle() {
+ return this.alarmList.size() == 0;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.closed;
+ }
+
+ @Override
+ public void run() {
+ this.runtime.get().start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java
new file mode 100644
index 0000000..d008373
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockClosedContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock closed context.
+ */
+@Unstable
+@Private
+public final class MockClosedContext implements ClosedContext {
+
+ private final MockActiveContext mockActiveContext;
+
+ public MockClosedContext(final MockActiveContext activeContext) {
+ this.mockActiveContext = activeContext;
+ }
+
+ public MockActiveContext getMockActiveContext() {
+ return this.mockActiveContext;
+ }
+
+ @Override
+ public ActiveContext getParentContext() {
+ return this.mockActiveContext.getParentContext().isPresent() ?
+ this.mockActiveContext.getParentContext().get() : null;
+ }
+
+ @Override
+ public String getId() {
+ return this.mockActiveContext.getId();
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.mockActiveContext.getEvaluatorId();
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return this.mockActiveContext.getParentId();
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.mockActiveContext.getEvaluatorDescriptor();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java
new file mode 100644
index 0000000..e141787
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockCompletedTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.CompletedTask;
+
+/**
+ * mock completed task.
+ */
+@Unstable
+@Private
+public final class MockCompletedTask implements CompletedTask {
+
+ private final MockRunningTask task;
+
+ private final byte[] returnValue;
+
+ public MockCompletedTask(final MockRunningTask task, final byte[] returnValue) {
+ this.task = task;
+ this.returnValue = returnValue;
+ }
+
+ @Override
+ public ActiveContext getActiveContext() {
+ return this.task.getActiveContext();
+ }
+
+ @Override
+ public String getId() {
+ return this.task.getId();
+ }
+
+ @Override
+ public byte[] get() {
+ return this.returnValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java
new file mode 100644
index 0000000..4341766
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorDescriptor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorProcess;
+
+/**
+ * mock evaluator descriptor.
+ */
+@Unstable
+@Private
+public final class MockEvaluatorDescriptor implements EvaluatorDescriptor {
+ private final NodeDescriptor nodeDescriptor;
+
+ MockEvaluatorDescriptor(final NodeDescriptor nodeDescriptor) {
+ this.nodeDescriptor = nodeDescriptor;
+ }
+
+ @Override
+ public NodeDescriptor getNodeDescriptor() {
+ return this.nodeDescriptor;
+ }
+
+ @Override
+ public EvaluatorProcess getProcess() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getMemory() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfCores() {
+ return 1;
+ }
+
+ @Override
+ public String getRuntimeName() {
+ return "mock";
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java
new file mode 100644
index 0000000..ec80422
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockEvaluatorRequestor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.mock.driver.request.AllocateEvaluator;
+import org.apache.reef.tang.InjectionFuture;
+
+import javax.inject.Inject;
+import java.util.UUID;
+
+/**
+ * mock evaluator requestor.
+ */
+@Unstable
+@Private
+public final class MockEvaluatorRequestor implements EvaluatorRequestor {
+
+ private final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver;
+
+ private final InjectionFuture<MockClock> clock;
+
+ @Inject
+ MockEvaluatorRequestor(
+ final InjectionFuture<MockClock> clock,
+ final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver) {
+ this.clock = clock;
+ this.mockRuntimeDriver = mockRuntimeDriver;
+ }
+
+ @Override
+ public void submit(final EvaluatorRequest req) {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock closed");
+ }
+ final NodeDescriptor nodeDescriptor = new MockNodeDescriptor();
+ final MockEvaluatorDescriptor evaluatorDescriptor = new MockEvaluatorDescriptor(nodeDescriptor);
+ for (int i = 0; i < req.getNumber(); i++) {
+ final MockAllocatedEvaluator mockEvaluator = new MockAllocatedEvaluator(
+ this.mockRuntimeDriver.get(), UUID.randomUUID().toString(), evaluatorDescriptor);
+ this.mockRuntimeDriver.get().add(new AllocateEvaluator(mockEvaluator));
+ }
+ }
+
+ @Override
+ public Builder newRequest() {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock closed");
+ }
+ return new Builder();
+ }
+
+
+ /**
+ * {@link EvaluatorRequest.Builder} extended with a new submit method.
+ * {@link EvaluatorRequest}s are built using this builder.
+ */
+ private final class Builder extends EvaluatorRequest.Builder<Builder> {
+ @Override
+ public void submit() {
+ MockEvaluatorRequestor.this.submit(this.build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java
new file mode 100644
index 0000000..d57cc2c
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedContext.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock failed context.
+ */
+@Unstable
+@Private
+public final class MockFailedContext implements FailedContext {
+
+ private final MockActiveContext context;
+
+ public MockFailedContext(final MockActiveContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Optional<ActiveContext> getParentContext() {
+ return this.context.getParentContext().isPresent() ?
+ Optional.of((ActiveContext)this.context.getParentContext().get()) :
+ Optional.<ActiveContext>empty();
+ }
+
+ @Override
+ public String getMessage() {
+ return "mock";
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<Throwable> getReason() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<byte[]> getData() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Throwable asError() {
+ return new Exception("mock");
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.context.getEvaluatorId();
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return this.context.getParentId();
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.context.getEvaluatorDescriptor();
+ }
+
+ @Override
+ public String getId() {
+ return this.context.getId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java
new file mode 100644
index 0000000..aed9c24
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockFailedEvaluator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.util.Optional;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * mock failed evaluator.
+ */
+@Unstable
+@Private
+public final class MockFailedEvaluator implements FailedEvaluator {
+
+ private final String evaluatorID;
+
+ private final List<FailedContext> failedContextList;
+
+ private final Optional<FailedTask> failedTask;
+
+ public MockFailedEvaluator(
+ final String evaluatorID,
+ final List<FailedContext> failedContextList,
+ final Optional<FailedTask> failedTask) {
+ this.evaluatorID = evaluatorID;
+ this.failedContextList = failedContextList;
+ this.failedTask = failedTask;
+ }
+
+ public MockFailedEvaluator(final String evaluatorID) {
+ this.evaluatorID = evaluatorID;
+ this.failedContextList = new ArrayList<>();
+ this.failedTask = Optional.empty();
+ }
+
+ @Override
+ public EvaluatorException getEvaluatorException() {
+ return null;
+ }
+
+ @Override
+ public List<FailedContext> getFailedContextList() {
+ return this.failedContextList;
+ }
+
+ @Override
+ public Optional<FailedTask> getFailedTask() {
+ return this.failedTask;
+ }
+
+ @Override
+ public String getId() {
+ return this.evaluatorID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java
new file mode 100644
index 0000000..8278a29
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockNodeDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * mock node descriptor.
+ */
+@Unstable
+@Private
+public final class MockNodeDescriptor implements NodeDescriptor {
+ @Override
+ public InetSocketAddress getInetSocketAddress() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RackDescriptor getRackDescriptor() {
+ return new RackDescriptor() {
+ @Override
+ public List<NodeDescriptor> getNodes() {
+ final List<NodeDescriptor> nodes = new ArrayList<>();
+ nodes.add(MockNodeDescriptor.this);
+ return nodes;
+ }
+
+ @Override
+ public String getName() {
+ return "mock";
+ }
+ };
+ }
+
+ @Override
+ public String getName() {
+ return "mock";
+ }
+
+ @Override
+ public String getId() {
+ return "mock";
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java
new file mode 100644
index 0000000..711635a
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRunningTask.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.mock.driver.request.CloseTask;
+import org.apache.reef.mock.driver.request.SendMessageDriverToTask;
+import org.apache.reef.mock.driver.request.SuspendTask;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
+import org.apache.reef.util.Optional;
+
+/**
+ * mock running task.
+ */
+@Unstable
+@Private
+public final class MockRunningTask implements RunningTask {
+
+ private final MockRuntimeDriver mockRuntimeDriver;
+
+ private final String taskID;
+
+ private final ActiveContext context;
+
+ MockRunningTask(
+ final MockRuntimeDriver mockRuntimeDriver,
+ final String taskID,
+ final ActiveContext context) {
+ this.mockRuntimeDriver = mockRuntimeDriver;
+ this.taskID = taskID;
+ this.context = context;
+ }
+
+ public String evaluatorID() {
+ return this.context.getEvaluatorId();
+ }
+
+ @Override
+ public ActiveContext getActiveContext() {
+ return this.context;
+ }
+
+ @Override
+ public void send(final byte[] message) {
+ this.mockRuntimeDriver.add(new SendMessageDriverToTask(this, message));
+ }
+
+ @Override
+ public void suspend(final byte[] message) {
+ this.mockRuntimeDriver.add(new SuspendTask(this, Optional.of(message)));
+ }
+
+ @Override
+ public void suspend() {
+ this.mockRuntimeDriver.add(new SuspendTask(this, Optional.<byte[]>empty()));
+ }
+
+ @Override
+ public void close(final byte[] message) {
+ this.mockRuntimeDriver.add(new CloseTask(this, this.mockRuntimeDriver.getTaskReturnValueProvider()));
+ }
+
+ @Override
+ public void close() {
+ this.mockRuntimeDriver.add(new CloseTask(this, this.mockRuntimeDriver.getTaskReturnValueProvider()));
+ }
+
+ @Override
+ public TaskRepresenter getTaskRepresenter() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getId() {
+ return this.taskID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java
new file mode 100644
index 0000000..42ea28d
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockRuntimeDriver.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.restart.DriverRestartCompleted;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.mock.driver.MockDriverRestartContext;
+import org.apache.reef.mock.driver.MockRuntime;
+import org.apache.reef.mock.driver.MockTaskReturnValueProvider;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.request.*;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.util.*;
+
+/**
+ * mock runtime driver.
+ */
+@Unstable
+@Private
+public final class MockRuntimeDriver implements MockRuntime {
+
+ private final InjectionFuture<MockClock> clock;
+
+ private final List<ProcessRequest> processRequestQueue = new ArrayList<>();
+
+ private final Set<EventHandler<StartTime>> driverStartHandlers;
+
+ private final Set<EventHandler<StopTime>> driverStopHandlers;
+
+ private final Set<EventHandler<AllocatedEvaluator>> allocatedEvaluatorHandlers;
+
+ private final Set<EventHandler<CompletedEvaluator>> completedEvaluatorHandlers;
+
+ private final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers;
+
+ private final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers;
+
+ private final Set<EventHandler<FailedTask>> taskFailedHandlers;
+
+ private final Set<EventHandler<TaskMessage>> taskMessageHandlers;
+
+ private final Set<EventHandler<CompletedTask>> taskCompletedHandlers;
+
+ private final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers;
+
+ private final Set<EventHandler<ActiveContext>> contextActiveHandlers;
+
+ private final Set<EventHandler<CloseContext>> contextClosedHandlers;
+
+ private final Set<EventHandler<ContextMessage>> contextMessageHandlers;
+
+ private final Set<EventHandler<FailedContext>> contextFailedHandlers;
+
+ private final Set<EventHandler<DriverRestarted>> driverRestartHandlers;
+
+ private final Set<EventHandler<RunningTask>> driverRestartRunningTaskHandlers;
+
+ private final Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers;
+
+ private final Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers;
+
+ private final Set<EventHandler<FailedEvaluator>> driverRestartFailedEvaluatorHandlers;
+
+ private final Map<String, MockAllocatedEvaluator> allocatedEvaluatorMap = new HashMap<>();
+
+ private final Map<String, List<MockActiveContext>> allocatedContextsMap = new HashMap<>();
+
+ private final Map<String, MockRunningTask> runningTasks = new HashMap<>();
+
+ private final MockTaskReturnValueProvider taskReturnValueProvider;
+
+ @Inject
+ MockRuntimeDriver(
+ final InjectionFuture<MockClock> clock,
+ final MockTaskReturnValueProvider taskReturnValueProvider,
+ @Parameter(DriverStartHandler.class) final Set<EventHandler<StartTime>> driverStartHandlers,
+ @Parameter(Clock.StopHandler.class) final Set<EventHandler<StopTime>> driverStopHandlers,
+ @Parameter(EvaluatorAllocatedHandlers.class) final Set<EventHandler<AllocatedEvaluator>>
+ allocatedEvaluatorHandlers,
+ @Parameter(EvaluatorCompletedHandlers.class) final Set<EventHandler<CompletedEvaluator>>
+ completedEvaluatorHandlers,
+ @Parameter(EvaluatorFailedHandlers.class) final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers,
+ @Parameter(TaskRunningHandlers.class) final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers,
+ @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> taskFailedHandlers,
+ @Parameter(TaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> taskMessageHandlers,
+ @Parameter(TaskCompletedHandlers.class) final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+ @Parameter(TaskSuspendedHandlers.class) final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+ @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers,
+ @Parameter(ContextClosedHandlers.class) final Set<EventHandler<CloseContext>> contextClosedHandlers,
+ @Parameter(ContextMessageHandlers.class) final Set<EventHandler<ContextMessage>> contextMessageHandlers,
+ @Parameter(ContextFailedHandlers.class) final Set<EventHandler<FailedContext>> contextFailedHandlers,
+ @Parameter(DriverRestartHandler.class) final Set<EventHandler<DriverRestarted>>
+ driverRestartHandlers,
+ @Parameter(DriverRestartTaskRunningHandlers.class) final Set<EventHandler<RunningTask>>
+ driverRestartRunningTaskHandlers,
+ @Parameter(DriverRestartContextActiveHandlers.class) final Set<EventHandler<ActiveContext>>
+ driverRestartActiveContextHandlers,
+ @Parameter(DriverRestartCompletedHandlers.class) final Set<EventHandler<DriverRestartCompleted>>
+ driverRestartCompletedHandlers,
+ @Parameter(DriverRestartFailedEvaluatorHandlers.class) final Set<EventHandler<FailedEvaluator>>
+ driverRestartFailedEvaluatorHandlers){
+ this.clock = clock;
+ this.taskReturnValueProvider = taskReturnValueProvider;
+ this.driverStartHandlers = driverStartHandlers;
+ this.driverStopHandlers = driverStopHandlers;
+ this.allocatedEvaluatorHandlers = allocatedEvaluatorHandlers;
+ this.completedEvaluatorHandlers = completedEvaluatorHandlers;
+ this.failedEvaluatorHandlers = failedEvaluatorHandlers;
+ this.taskRunningHandlers = taskRunningHandlers;
+ this.taskFailedHandlers = taskFailedHandlers;
+ this.taskMessageHandlers = taskMessageHandlers;
+ this.taskCompletedHandlers = taskCompletedHandlers;
+ this.taskSuspendedHandlers = taskSuspendedHandlers;
+ this.contextActiveHandlers = contextActiveHandlers;
+ this.contextClosedHandlers = contextClosedHandlers;
+ this.contextMessageHandlers = contextMessageHandlers;
+ this.contextFailedHandlers = contextFailedHandlers;
+ this.driverRestartHandlers = driverRestartHandlers;
+ this.driverRestartRunningTaskHandlers = driverRestartRunningTaskHandlers;
+ this.driverRestartActiveContextHandlers = driverRestartActiveContextHandlers;
+ this.driverRestartCompletedHandlers = driverRestartCompletedHandlers;
+ this.driverRestartFailedEvaluatorHandlers = driverRestartFailedEvaluatorHandlers;
+ }
+
+ @Override
+ public Collection<AllocatedEvaluator> getCurrentAllocatedEvaluators() {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock is closed");
+ }
+ return new ArrayList<AllocatedEvaluator>(this.allocatedEvaluatorMap.values());
+ }
+
+ @Override
+ public void fail(final AllocatedEvaluator evaluator) {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock is closed");
+ }
+ if (this.allocatedEvaluatorMap.remove(evaluator.getId()) == null) {
+ throw new IllegalStateException("unknown evaluator " + evaluator);
+ }
+ FailedTask failedTask = null;
+ if (this.runningTasks.containsKey(evaluator.getId())) {
+ final RunningTask task = this.runningTasks.remove(evaluator.getId());
+ failedTask = new FailedTask(
+ task.getId(),
+ "mock",
+ Optional.<String>empty(),
+ Optional.<Throwable>empty(),
+ Optional.<byte[]>empty(),
+ Optional.<ActiveContext>of(task.getActiveContext()));
+ }
+ final List<FailedContext> failedContexts = new ArrayList<>();
+ for (final MockActiveContext context : this.allocatedContextsMap.get(evaluator.getId())) {
+ failedContexts.add(new MockFailedContext(context));
+ }
+ post(this.failedEvaluatorHandlers, new MockFailedEvaluator(
+ evaluator.getId(), failedContexts, Optional.ofNullable(failedTask)));
+ }
+
+ @Override
+ public Collection<ActiveContext> getCurrentActiveContexts() {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock is closed");
+ }
+ final List<ActiveContext> currentActiveContexts = new ArrayList<>();
+ for (final List<MockActiveContext> contexts : this.allocatedContextsMap.values()) {
+ currentActiveContexts.addAll(contexts);
+ }
+ return currentActiveContexts;
+ }
+
+ @Override
+ public void fail(final ActiveContext context) {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock is closed");
+ }
+
+ final MockAllocatedEvaluator evaluator = ((MockActiveContext) context).getEvaluator();
+
+ // Root context failure shuts evaluator down.
+ if (!((MockActiveContext) context).getParentContext().isPresent()) {
+ allocatedEvaluatorMap.remove(evaluator.getId());
+ post(this.completedEvaluatorHandlers, new CompletedEvaluator() {
+ @Override
+ public String getId() {
+ return evaluator.getId();
+ }
+ });
+ }
+
+ this.allocatedContextsMap.get(evaluator.getId()).remove(context);
+ post(this.contextFailedHandlers, new MockFailedContext((MockActiveContext) context));
+ }
+
+ @Override
+ public Collection<RunningTask> getCurrentRunningTasks() {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock is closed");
+ }
+ return new ArrayList<RunningTask>(this.runningTasks.values());
+ }
+
+ @Override
+ public void fail(final RunningTask task) {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock is closed");
+ }
+ final String evaluatorID = task.getActiveContext().getEvaluatorId();
+ if (this.runningTasks.containsKey(evaluatorID) &&
+ this.runningTasks.get(evaluatorID).equals(task)) {
+ this.runningTasks.remove(evaluatorID);
+ post(taskFailedHandlers, new FailedTask(
+ task.getId(),
+ "mock",
+ Optional.<String>empty(),
+ Optional.<Throwable>empty(),
+ Optional.<byte[]>empty(),
+ Optional.of(task.getActiveContext())));
+ } else {
+ throw new IllegalStateException("unknown running task " + task);
+ }
+ }
+
+ @Override
+ public void start() {
+ post(this.driverStartHandlers, new StartTime(this.clock.get().getCurrentTime()));
+ }
+
+ @Override
+ public void stop() {
+ post(this.driverStopHandlers, new StopTime(this.clock.get().getCurrentTime()));
+ }
+
+ @Override
+ public void restart(final MockDriverRestartContext restartContext, final boolean isTimeout, final long duration) {
+ post(this.driverRestartHandlers, restartContext.getDriverRestarted());
+ for (final RunningTask runningTask : restartContext.getRunningTasks()) {
+ post(this.driverRestartRunningTaskHandlers, runningTask);
+ }
+ for (final ActiveContext activeContext : restartContext.getIdleActiveContexts()) {
+ post(this.driverRestartActiveContextHandlers, activeContext);
+ }
+ post(this.driverRestartCompletedHandlers, restartContext.getDriverRestartCompleted(isTimeout, duration));
+ for (final FailedEvaluator failedEvaluator : restartContext.getFailedEvaluators()) {
+ post(this.driverRestartFailedEvaluatorHandlers, failedEvaluator);
+ }
+ }
+
+ @Override
+ public MockDriverRestartContext failDriver(final int attempt, final StartTime startTime) {
+ final List<MockActiveContext> activeContexts = new ArrayList<>();
+ for (final List<MockActiveContext> contexts : this.allocatedContextsMap.values()) {
+ if (contexts.size() > 0) {
+ activeContexts.add(contexts.get(contexts.size() - 1));
+ }
+ }
+ return new MockDriverRestartContext(
+ attempt,
+ startTime,
+ new ArrayList<>(this.allocatedEvaluatorMap.values()),
+ activeContexts,
+ new ArrayList<>(this.runningTasks.values()));
+ }
+
+ @Override
+ public boolean hasProcessRequest() {
+ return this.processRequestQueue.size() > 0;
+ }
+
+ @Override
+ public ProcessRequest getNextProcessRequest() {
+ if (this.processRequestQueue.size() > 0) {
+ return this.processRequestQueue.remove(0);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void succeed(final ProcessRequest pr) {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock is closed");
+ }
+ final ProcessRequestInternal request = (ProcessRequestInternal) pr;
+ switch (request.getType()) {
+ case ALLOCATE_EVALUATOR:
+ final MockAllocatedEvaluator allocatedEvaluator = ((AllocateEvaluator)request).getSuccessEvent();
+ validateAndCreate(allocatedEvaluator);
+ post(this.allocatedEvaluatorHandlers, allocatedEvaluator);
+ break;
+ case CLOSE_EVALUATOR:
+ final CompletedEvaluator closedEvaluator = ((CloseEvaluator)request).getSuccessEvent();
+ validateAndClose(closedEvaluator);
+ post(this.completedEvaluatorHandlers, closedEvaluator);
+ break;
+ case CREATE_CONTEXT:
+ final MockActiveContext createContext = ((CreateContext) request).getSuccessEvent();
+ validateAndCreate(createContext);
+ post(this.contextActiveHandlers, createContext);
+ break;
+ case CLOSE_CONTEXT:
+ final MockClosedContext closeContext = ((CloseContext) request).getSuccessEvent();
+ validateAndClose(closeContext);
+ post(this.contextClosedHandlers, closeContext);
+ break;
+ case CREATE_TASK:
+ final MockRunningTask createTask = ((CreateTask)request).getSuccessEvent();
+ validateAndCreate(createTask);
+ post(this.taskRunningHandlers, request.getSuccessEvent());
+ break;
+ case SUSPEND_TASK:
+ final MockRunningTask suspendedTask = ((SuspendTask)request).getTask();
+ validateAndClose(suspendedTask);
+ post(this.taskSuspendedHandlers, request.getSuccessEvent());
+ break;
+ case CLOSE_TASK:
+ case COMPLETE_TASK:
+ final MockRunningTask completedTask = ((CompleteTask)request).getTask();
+ validateAndClose(completedTask);
+ post(this.taskCompletedHandlers, request.getSuccessEvent());
+ break;
+ case CREATE_CONTEXT_AND_TASK:
+ final CreateContextAndTask createContextTask = (CreateContextAndTask) request;
+ final Tuple<MockActiveContext, MockRunningTask> events = createContextTask.getSuccessEvent();
+ validateAndCreate(events.getKey());
+ post(this.contextActiveHandlers, events.getKey());
+ validateAndCreate(events.getValue());
+ post(this.taskRunningHandlers, events.getValue());
+ break;
+ case SEND_MESSAGE_DRIVER_TO_TASK:
+ // ignore
+ break;
+ case SEND_MESSAGE_DRIVER_TO_CONTEXT:
+ // ignore
+ break;
+ default:
+ throw new IllegalStateException("unknown type");
+ }
+
+ if (request.doAutoComplete()) {
+ add(request.getCompletionProcessRequest());
+ } else if (!this.clock.get().isClosed() && isIdle()) {
+ this.clock.get().close();
+ }
+ }
+
+ @Override
+ public void fail(final ProcessRequest pr) {
+ if (this.clock.get().isClosed()) {
+ throw new IllegalStateException("clock is closed");
+ }
+ final ProcessRequestInternal request = (ProcessRequestInternal) pr;
+ switch (request.getType()) {
+ case ALLOCATE_EVALUATOR:
+ post(this.failedEvaluatorHandlers, request.getFailureEvent());
+ break;
+ case CLOSE_EVALUATOR:
+ final CompletedEvaluator evaluator = ((CloseEvaluator)request).getSuccessEvent();
+ validateAndClose(evaluator);
+ post(this.failedEvaluatorHandlers, request.getFailureEvent());
+ break;
+ case CREATE_CONTEXT:
+ post(this.contextFailedHandlers, request.getFailureEvent());
+ break;
+ case CLOSE_CONTEXT:
+ final MockClosedContext context = ((CloseContext)request).getSuccessEvent();
+ validateAndClose(context);
+ if (context.getParentContext() == null) {
+ add(new CloseEvaluator(context.getMockActiveContext().getEvaluator()));
+ }
+ post(this.contextFailedHandlers, request.getFailureEvent());
+ break;
+ case CREATE_TASK:
+ post(this.taskFailedHandlers, request.getFailureEvent());
+ break;
+ case SUSPEND_TASK:
+ validateAndClose(((SuspendTask)request).getTask());
+ post(this.taskFailedHandlers, request.getFailureEvent());
+ break;
+ case CLOSE_TASK:
+ case COMPLETE_TASK:
+ validateAndClose(((CloseTask)request).getTask());
+ post(this.taskFailedHandlers, request.getFailureEvent());
+ break;
+ case CREATE_CONTEXT_AND_TASK:
+ final CreateContextAndTask createContextTask = (CreateContextAndTask) request;
+ final Tuple<MockFailedContext, FailedTask> events = createContextTask.getFailureEvent();
+ post(this.taskFailedHandlers, events.getValue());
+ post(this.contextFailedHandlers, events.getKey());
+ break;
+ case SEND_MESSAGE_DRIVER_TO_TASK:
+ // ignore
+ break;
+ case SEND_MESSAGE_DRIVER_TO_CONTEXT:
+ // ignore
+ break;
+ default:
+ throw new IllegalStateException("unknown type");
+ }
+
+ if (!this.clock.get().isClosed() && isIdle()) {
+ this.clock.get().close();
+ }
+ }
+
+ @Override
+ public void publish(final ContextMessage contextMessage) {
+ for (final EventHandler<ContextMessage> handler : this.contextMessageHandlers) {
+ handler.onNext(contextMessage);
+ }
+ }
+
+ MockTaskReturnValueProvider getTaskReturnValueProvider() {
+ return this.taskReturnValueProvider;
+ }
+ /**
+ * Used by mock REEF entities (e.g., AllocatedEvaluator, RunningTask) to inject
+ * process requests from initiated actions e.g., RunningTask.close().
+ * @param request to inject
+ */
+ void add(final ProcessRequest request) {
+ this.processRequestQueue.add(request);
+ }
+
+ private boolean isIdle() {
+ return this.clock.get().isIdle() &&
+ this.processRequestQueue.isEmpty() &&
+ this.allocatedEvaluatorMap.isEmpty();
+ }
+
+ private <T> void post(final Set<EventHandler<T>> handlers, final Object event) {
+ for (final EventHandler<T> handler : handlers) {
+ handler.onNext((T) event);
+ }
+ }
+
+ private void validateAndCreate(final MockActiveContext context) {
+ if (!this.allocatedEvaluatorMap.containsKey(context.getEvaluatorId())) {
+ throw new IllegalStateException("unknown evaluator id " + context.getEvaluatorId());
+ } else if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) {
+ this.allocatedContextsMap.put(context.getEvaluatorId(), new ArrayList<MockActiveContext>());
+ }
+ this.allocatedContextsMap.get(context.getEvaluatorId()).add(context);
+ }
+
+ private void validateAndClose(final MockClosedContext context) {
+ if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) {
+ throw new IllegalStateException("unknown evaluator id " + context.getEvaluatorId());
+ }
+ final List<MockActiveContext> contexts = this.allocatedContextsMap.get(context.getEvaluatorId());
+ if (!contexts.get(contexts.size() - 1).equals(context.getMockActiveContext())) {
+ throw new IllegalStateException("closing context that is not on the top of the stack");
+ }
+ contexts.remove(context.getMockActiveContext());
+ }
+
+ private void validateAndCreate(final MockRunningTask task) {
+ if (this.runningTasks.containsKey(task.evaluatorID())) {
+ throw new IllegalStateException("task already running on evaluator " +
+ task.evaluatorID());
+ }
+ this.runningTasks.put(task.evaluatorID(), task);
+ }
+
+ private void validateAndClose(final MockRunningTask task) {
+ if (!this.runningTasks.containsKey(task.getActiveContext().getEvaluatorId())) {
+ throw new IllegalStateException("no task running on evaluator");
+ }
+ this.runningTasks.remove(task.getActiveContext().getEvaluatorId());
+ }
+
+ private void validateAndCreate(final MockAllocatedEvaluator evaluator) {
+ if (this.allocatedEvaluatorMap.containsKey(evaluator.getId())) {
+ throw new IllegalStateException("evaluator id " + evaluator.getId() + " already exists");
+ }
+ this.allocatedEvaluatorMap.put(evaluator.getId(), evaluator);
+ this.allocatedContextsMap.put(evaluator.getId(), new ArrayList<MockActiveContext>());
+ }
+
+ private void validateAndClose(final CompletedEvaluator evaluator) {
+ if (!this.allocatedEvaluatorMap.containsKey(evaluator.getId())) {
+ throw new IllegalStateException("unknown evaluator id " + evaluator.getId());
+ }
+ this.allocatedEvaluatorMap.remove(evaluator.getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java
new file mode 100644
index 0000000..9f25ae32
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockSuspendedTask.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.SuspendedTask;
+
+/**
+ * mock suspended task.
+ */
+@Unstable
+@Private
+public final class MockSuspendedTask implements SuspendedTask {
+
+ private final MockRunningTask task;
+
+ public MockSuspendedTask(final MockRunningTask task) {
+ this.task = task;
+ }
+
+ @Override
+ public ActiveContext getActiveContext() {
+ return this.task.getActiveContext();
+ }
+
+ @Override
+ public byte[] get() {
+ return new byte[0];
+ }
+
+ @Override
+ public String getId() {
+ return this.task.getId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java
new file mode 100644
index 0000000..23d3b46
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/MockUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.runtime;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+/**
+ * mock utilities.
+ */
+@Unstable
+@Private
+final class MockUtils {
+
+ private MockUtils() {
+ }
+
+ public static <U, T extends Name<U>> U getValue(final Configuration configuration, final Class<T> name) {
+ try {
+ final Injector injector = Tang.Factory.getTang().newInjector(configuration);
+ return injector.getNamedInstance(name);
+ } catch (InjectionException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java
new file mode 100644
index 0000000..50e7eac
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/runtime/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/**
+ * mock runtime implementation.
+ */
+package org.apache.reef.mock.driver.runtime;
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java
deleted file mode 100644
index fdda864..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/package-info.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-/**
- * Mock runtime API.
- *
- * Mock runtime is meant to mimic the semantics of the REEF runtime and
- * allow:
- * 1. Applications to driver the forward progress of processing REEF events.
- * See {@link org.apache.reef.mock.MockRuntime} API
- * 2. Control the advancement of the Clock and Alarm callbacks.
- * See {@link org.apache.reef.mock.runtime.MockClock}
- * 3. Inject failures into the REEF applications.
- * See {@link org.apache.reef.mock.MockFailure}
- *
- * Use {@link org.apache.reef.mock.MockConfiguration} to bind your REEF
- * driver application event handlers.
- *
- * Use {@link org.apache.reef.mock.MockRuntime#start()} to trigger the
- * driver start event and {@link org.apache.reef.mock.MockRuntime#stop()}}
- * or {@link org.apache.reef.mock.runtime.MockClock#close()} to trigger the driver
- * stop event.
- */
-package org.apache.reef.mock;
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java
deleted file mode 100644
index 9d6e400..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/AllocateEvaluator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockAllocatedEvalautor;
-import org.apache.reef.mock.runtime.MockFailedEvaluator;
-
-/**
- * Allocate Evaluator process request.
- */
-@Unstable
-@Private
-public final class AllocateEvaluator implements
- ProcessRequestInternal<MockAllocatedEvalautor, FailedEvaluator> {
-
- private final MockAllocatedEvalautor evaluator;
-
- public AllocateEvaluator(final MockAllocatedEvalautor evaluator) {
- this.evaluator = evaluator;
- }
-
- @Override
- public Type getType() {
- return Type.ALLOCATE_EVALUATOR;
- }
-
- @Override
- public MockAllocatedEvalautor getSuccessEvent() {
- return this.evaluator;
- }
-
- @Override
- public FailedEvaluator getFailureEvent() {
- return new MockFailedEvaluator(evaluator.getId());
- }
-
- @Override
- public boolean doAutoComplete() {
- return false;
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java
deleted file mode 100644
index 00bdf3c..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.mock.AutoCompletable;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockActiveContext;
-import org.apache.reef.mock.runtime.MockClosedContext;
-import org.apache.reef.mock.runtime.MockFailedContext;
-
-/**
- * close context process request.
- */
-@Unstable
-@Private
-public final class CloseContext implements
- ProcessRequestInternal<ClosedContext, FailedContext>,
- AutoCompletable {
-
- private final MockActiveContext context;
-
- public CloseContext(final MockActiveContext context) {
- this.context = context;
- }
-
- @Override
- public Type getType() {
- return Type.CLOSE_CONTEXT;
- }
-
- @Override
- public MockClosedContext getSuccessEvent() {
- return new MockClosedContext(this.context);
- }
-
- @Override
- public FailedContext getFailureEvent() {
- return new MockFailedContext(this.context);
- }
-
- @Override
- public boolean doAutoComplete() {
- return !this.context.getParentContext().isPresent();
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- return new CloseEvaluator(this.context.getEvaluator());
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java
deleted file mode 100644
index 6ef2b9f..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseEvaluator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.evaluator.CompletedEvaluator;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockAllocatedEvalautor;
-import org.apache.reef.mock.runtime.MockFailedEvaluator;
-
-/**
- * close evaluator request.
- */
-@Unstable
-@Private
-public final class CloseEvaluator implements ProcessRequestInternal<CompletedEvaluator, FailedEvaluator> {
-
- private final MockAllocatedEvalautor evaluator;
-
- public CloseEvaluator(final MockAllocatedEvalautor evaluator) {
- this.evaluator = evaluator;
- }
-
- @Override
- public Type getType() {
- return Type.CLOSE_EVALUATOR;
- }
-
- @Override
- public CompletedEvaluator getSuccessEvent() {
- return new CompletedEvaluator() {
- @Override
- public String getId() {
- return evaluator.getId();
- }
- };
- }
-
- @Override
- public FailedEvaluator getFailureEvent() {
- // TODO[initialize remaining failed contstructer fields]
- return new MockFailedEvaluator(evaluator.getId());
- }
-
- @Override
- public boolean doAutoComplete() {
- return false;
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- throw new UnsupportedOperationException();
- }
-}
[4/4] reef git commit: [REEF-2012] Add driver restart capabilities to
reef runtime mock
Posted by mo...@apache.org.
[REEF-2012] Add driver restart capabilities to reef runtime mock
JIRA: [REEF-2012](https://issues.apache.org/jira/browse/REEF-2012)
Pull Request:
Closes #1452
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/5ed56eba
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/5ed56eba
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/5ed56eba
Branch: refs/heads/master
Commit: 5ed56eba3b0b438dc0fdf10a37a58fcfdeb81223
Parents: 35df820
Author: Tyson Condie <tc...@apache.org>
Authored: Thu Apr 26 12:52:51 2018 -0700
Committer: Sergiy Matusevych <mo...@apache.org>
Committed: Mon Apr 30 11:09:13 2018 -0700
----------------------------------------------------------------------
.../org/apache/reef/mock/AutoCompletable.java | 45 --
.../mock/DefaultTaskReturnValueProvider.java | 41 --
.../org/apache/reef/mock/MockConfiguration.java | 173 ------
.../java/org/apache/reef/mock/MockFailure.java | 67 ---
.../java/org/apache/reef/mock/MockRuntime.java | 76 ---
.../reef/mock/MockTaskReturnValueProvider.java | 44 --
.../org/apache/reef/mock/ProcessRequest.java | 54 --
.../reef/mock/driver/AutoCompletable.java | 45 ++
.../driver/DefaultTaskReturnValueProvider.java | 41 ++
.../reef/mock/driver/MockConfiguration.java | 208 ++++++++
.../mock/driver/MockDriverRestartContext.java | 172 ++++++
.../apache/reef/mock/driver/MockFailure.java | 74 +++
.../apache/reef/mock/driver/MockRuntime.java | 92 ++++
.../driver/MockTaskReturnValueProvider.java | 44 ++
.../apache/reef/mock/driver/ProcessRequest.java | 54 ++
.../apache/reef/mock/driver/package-info.java | 40 ++
.../mock/driver/request/AllocateEvaluator.java | 72 +++
.../reef/mock/driver/request/CloseContext.java | 76 +++
.../mock/driver/request/CloseEvaluator.java | 78 +++
.../reef/mock/driver/request/CloseTask.java | 89 ++++
.../reef/mock/driver/request/CompleteTask.java | 82 +++
.../reef/mock/driver/request/CreateContext.java | 76 +++
.../driver/request/CreateContextAndTask.java | 98 ++++
.../reef/mock/driver/request/CreateTask.java | 89 ++++
.../driver/request/ProcessRequestInternal.java | 44 ++
.../request/SendMessageDriverToContext.java | 81 +++
.../driver/request/SendMessageDriverToTask.java | 81 +++
.../reef/mock/driver/request/SuspendTask.java | 90 ++++
.../reef/mock/driver/request/package-info.java | 23 +
.../mock/driver/runtime/MockActiveContext.java | 139 +++++
.../driver/runtime/MockAllocatedEvaluator.java | 172 ++++++
.../reef/mock/driver/runtime/MockClock.java | 120 +++++
.../mock/driver/runtime/MockClosedContext.java | 71 +++
.../mock/driver/runtime/MockCompletedTask.java | 57 ++
.../driver/runtime/MockEvaluatorDescriptor.java | 64 +++
.../driver/runtime/MockEvaluatorRequestor.java | 85 +++
.../mock/driver/runtime/MockFailedContext.java | 93 ++++
.../driver/runtime/MockFailedEvaluator.java | 79 +++
.../mock/driver/runtime/MockNodeDescriptor.java | 68 +++
.../mock/driver/runtime/MockRunningTask.java | 97 ++++
.../mock/driver/runtime/MockRuntimeDriver.java | 522 +++++++++++++++++++
.../mock/driver/runtime/MockSuspendedTask.java | 54 ++
.../reef/mock/driver/runtime/MockUtils.java | 48 ++
.../reef/mock/driver/runtime/package-info.java | 23 +
.../java/org/apache/reef/mock/package-info.java | 40 --
.../reef/mock/request/AllocateEvaluator.java | 72 ---
.../apache/reef/mock/request/CloseContext.java | 76 ---
.../reef/mock/request/CloseEvaluator.java | 78 ---
.../org/apache/reef/mock/request/CloseTask.java | 89 ----
.../apache/reef/mock/request/CompleteTask.java | 82 ---
.../apache/reef/mock/request/CreateContext.java | 76 ---
.../reef/mock/request/CreateContextAndTask.java | 98 ----
.../apache/reef/mock/request/CreateTask.java | 89 ----
.../mock/request/ProcessRequestInternal.java | 44 --
.../request/SendMessageDriverToContext.java | 81 ---
.../mock/request/SendMessageDriverToTask.java | 81 ---
.../apache/reef/mock/request/SuspendTask.java | 90 ----
.../apache/reef/mock/request/package-info.java | 23 -
.../reef/mock/runtime/MockActiveContext.java | 139 -----
.../mock/runtime/MockAllocatedEvalautor.java | 149 ------
.../org/apache/reef/mock/runtime/MockClock.java | 120 -----
.../reef/mock/runtime/MockClosedContext.java | 71 ---
.../reef/mock/runtime/MockCompletedTask.java | 57 --
.../mock/runtime/MockEvaluatorDescriptor.java | 64 ---
.../mock/runtime/MockEvaluatorRequestor.java | 85 ---
.../reef/mock/runtime/MockFailedContext.java | 93 ----
.../reef/mock/runtime/MockFailedEvaluator.java | 79 ---
.../reef/mock/runtime/MockNodeDescriptor.java | 68 ---
.../reef/mock/runtime/MockRunningTask.java | 97 ----
.../reef/mock/runtime/MockRuntimeDriver.java | 454 ----------------
.../reef/mock/runtime/MockSuspendedTask.java | 54 --
.../org/apache/reef/mock/runtime/MockUtils.java | 48 --
.../apache/reef/mock/runtime/package-info.java | 23 -
.../org/apache/reef/mock/BasicMockTests.java | 205 --------
.../org/apache/reef/mock/MockApplication.java | 275 ----------
.../apache/reef/mock/driver/BasicMockTests.java | 163 ++++++
.../reef/mock/driver/MockApplication.java | 275 ++++++++++
.../apache/reef/mock/driver/package-info.java | 23 +
.../java/org/apache/reef/mock/package-info.java | 23 -
.../driver/YarnDriverRestartConfiguration.java | 2 +-
.../driver/YarnDriverRuntimeRestartManager.java | 2 +-
81 files changed, 3904 insertions(+), 3625 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/AutoCompletable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/AutoCompletable.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/AutoCompletable.java
deleted file mode 100644
index 173b410..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/AutoCompletable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock;
-
-import org.apache.reef.annotations.Unstable;
-
-/**
- * Indicates that a process request should auto complete.
- */
-@Unstable
-public interface AutoCompletable {
-
- /**
- * @return true if should auto complete
- */
- boolean doAutoComplete();
-
- /**
- * Set auto complete.
- * @param value to set
- */
- void setAutoComplete(final boolean value);
-
- /**
- * @return auto complete process request
- */
- ProcessRequest getCompletionProcessRequest();
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/DefaultTaskReturnValueProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/DefaultTaskReturnValueProvider.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/DefaultTaskReturnValueProvider.java
deleted file mode 100644
index 2ae81d4..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/DefaultTaskReturnValueProvider.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.reef.mock;
-
-import org.apache.reef.mock.runtime.MockRunningTask;
-
-import javax.inject.Inject;
-
-/**
- * A default task return value provider.
- */
-final class DefaultTaskReturnValueProvider implements MockTaskReturnValueProvider {
-
- @Inject
- DefaultTaskReturnValueProvider() {
-
- }
-
- @Override
- public byte[] getReturnValue(final MockRunningTask task) {
- return new byte[0];
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockConfiguration.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockConfiguration.java
deleted file mode 100644
index beec5c4..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockConfiguration.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.mock;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.driver.client.JobMessageObserver;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.context.ContextMessage;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.CompletedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.driver.parameters.*;
-import org.apache.reef.driver.task.*;
-import org.apache.reef.mock.runtime.MockClock;
-import org.apache.reef.mock.runtime.MockEvaluatorRequestor;
-import org.apache.reef.mock.runtime.MockRuntimeDriver;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.OptionalImpl;
-import org.apache.reef.tang.formats.RequiredImpl;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.Clock;
-import org.apache.reef.wake.time.event.StartTime;
-import org.apache.reef.wake.time.event.StopTime;
-
-/**
- * Configure a mock runtime.
- */
-@Unstable
-public class MockConfiguration extends ConfigurationModuleBuilder {
-
- /**
- * The event handler invoked right after the driver boots up.
- */
- public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
-
- /**
- * The event handler invoked right before the driver shuts down. Defaults to ignore.
- */
- public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
-
- // ***** EVALUATOR HANDLER BINDINGS:
-
- /**
- * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
- */
- public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
-
- /**
- * Event handler for completed evaluators. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
-
- /**
- * Event handler for failed evaluators. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
-
- // ***** TASK HANDLER BINDINGS:
-
- /**
- * Event handler for task messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
-
- /**
- * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
- */
- public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
-
- /**
- * Event handler for failed tasks. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
-
- /**
- * Event handler for running tasks. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
-
- /**
- * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
- * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
- */
- public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
-
- // ***** CONTEXT HANDLER BINDINGS:
-
- /**
- * Event handler for active context. Defaults to closing the context if not bound.
- */
- public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
-
- /**
- * Event handler for closed context. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
-
- /**
- * Event handler for closed context. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
-
- /**
- * Event handler for context messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
-
-
- /**
- * Receiver of messages sent by the Driver to the client.
- */
- public static final OptionalImpl<JobMessageObserver> ON_JOB_MESSAGE = new OptionalImpl<>();
-
- /**
- * An implementation of a task return value provider.
- */
- public static final OptionalImpl<MockTaskReturnValueProvider> TASK_RETURN_VALUE_PROVIDER = new OptionalImpl<>();
-
- public static final ConfigurationModule CONF = new MockConfiguration()
- .bindImplementation(EvaluatorRequestor.class, MockEvaluatorRequestor.class) // requesting evaluators
- .bindImplementation(MockRuntime.class, MockRuntimeDriver.class)
- .bindImplementation(MockFailure.class, MockRuntimeDriver.class)
- .bindImplementation(Clock.class, MockClock.class)
- .bindImplementation(MockTaskReturnValueProvider.class, TASK_RETURN_VALUE_PROVIDER)
-
- // client handlers
- .bindImplementation(JobMessageObserver.class, ON_JOB_MESSAGE) // sending message to job client
-
- // Driver start/stop handlers
- .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED)
- .bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
-
- // Evaluator handlers
- .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
- .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
- .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
-
- // Task handlers
- .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING)
- .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED)
- .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE)
- .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED)
- .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
-
- // Context handlers
- .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
- .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED)
- .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
- .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED)
-
- .build();
-
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockFailure.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockFailure.java
deleted file mode 100644
index f8822a2..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockFailure.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.reef.mock;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.task.RunningTask;
-
-import java.util.Collection;
-
-/**
- * Used to fail running REEF entities i.e., Evaluators, Contexts, Tasks.
- */
-@Unstable
-public interface MockFailure {
-
- /**
- * @return current Collection of allocated evaluators.
- */
- Collection<AllocatedEvaluator> getCurrentAllocatedEvaluators();
-
- /**
- * Fail an allocated evaluator.
- * @param evaluator to be failed
- */
- void fail(final AllocatedEvaluator evaluator);
-
- /**
- * @return current Collection of active contexts
- */
- Collection<ActiveContext> getCurrentActiveContexts();
-
- /**
- * Fail an ActiveContext.
- * @param context to be failed
- */
- void fail(final ActiveContext context);
-
- /**
- * @return current Collection of running tasks
- */
- Collection<RunningTask> getCurrentRunningTasks();
-
- /**
- * Fail a running task.
- * @param task to be failed
- */
- void fail(final RunningTask task);
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockRuntime.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockRuntime.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockRuntime.java
deleted file mode 100644
index 0e09f5d..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockRuntime.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock;
-
-import org.apache.reef.annotations.Unstable;
-
-/**
- * Mock API used to drive the evaluation of ProcessRequest
- * events, which are triggered by the Application driver.
- * Clients used this to determine whether a particular ProcessRequest
- * event should succeed or fail.
- */
-@Unstable
-public interface MockRuntime extends MockFailure {
-
- /**
- * Initiate the start time event to the application driver.
- */
- void start();
-
- /**
- * Initiate the stop time event to the application driver.
- */
- void stop();
-
- /**
- * @return true if there is an outstanding ProcessRequest
- */
- boolean hasProcessRequest();
-
- /**
- * The client (caller) is responsible for determining what
- * to do with a ProcessRequest event. There are three options:
- * 1. Pass to the succeed method, which signals success to the driver.
- * 2. Pass to the fail method, signaling failure to the driver.
- * 3. Drop it on the floor (e.g., network failure).
- *
- * @return the next ProcessRequest object to be processed.
- */
- ProcessRequest getNextProcessRequest();
-
- /**
- * The driver will be informed that the operation corresponding
- * to the ProcessRequest succeeded, and will be given any relevant
- * data structures e.g., AllocatedEvaluator, RunningTask, etc.
- *
- * @param request to be processed successfully
- */
- void succeed(final ProcessRequest request);
-
- /**
- * The driver will be informed that the operation corresponding
- * to the PRocessRequest failed, and will be given any relevant
- * data structures e.g., FailedEvaluator, FailedTask, etc.
- *
- * @param request to be failed.
- */
- void fail(final ProcessRequest request);
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockTaskReturnValueProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockTaskReturnValueProvider.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockTaskReturnValueProvider.java
deleted file mode 100644
index a0e794b..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/MockTaskReturnValueProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.reef.mock;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.driver.task.CompletedTask;
-import org.apache.reef.mock.runtime.MockRunningTask;
-import org.apache.reef.tang.annotations.DefaultImplementation;
-
-/**
- * Clients bind an implementation of this interface, which
- * will be used to create a mock return value for a mock
- * task execution. This return value will be returned by
- * the {@link CompletedTask#get()}} method.
- */
-@Unstable
-@DefaultImplementation(DefaultTaskReturnValueProvider.class)
-public interface MockTaskReturnValueProvider {
-
- /**
- * Provide a valid return value for the {@link CompletedTask#get()} method.
- * @param task that is to be provided with a return value
- * @return {@link org.apache.reef.task.Task#call(byte[])} return value
- */
- byte[] getReturnValue(final MockRunningTask task);
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/ProcessRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/ProcessRequest.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/ProcessRequest.java
deleted file mode 100644
index 09e9691..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/ProcessRequest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock;
-
-import org.apache.reef.annotations.Unstable;
-
-/**
- * A ProcessRequest refers to an outstanding event that is
- * waiting to be processed by the REEF mock runtime. Clients
- * are responsible for deciding how a ProcessRequest should be
- * handled, by either:
- * 1. successfully processing the request
- * 2. unsucessfully processing the request
- * 3. dropping the processing request (i.e., loosing it)
- * These decisions are conveyed through the {MockRuntime} API.
- */
-@Unstable
-public interface ProcessRequest extends AutoCompletable {
- /**
- * process request type.
- */
- enum Type {
- ALLOCATE_EVALUATOR,
- CLOSE_EVALUATOR,
- CREATE_CONTEXT,
- CLOSE_CONTEXT,
- CREATE_TASK,
- SUSPEND_TASK,
- CLOSE_TASK,
- COMPLETE_TASK,
- CREATE_CONTEXT_AND_TASK,
- SEND_MESSAGE_DRIVER_TO_TASK,
- SEND_MESSAGE_DRIVER_TO_CONTEXT
- }
-
- Type getType();
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/AutoCompletable.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/AutoCompletable.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/AutoCompletable.java
new file mode 100644
index 0000000..3ecb0b3
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/AutoCompletable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * Indicates that a process request should auto complete.
+ */
+@Unstable
+public interface AutoCompletable {
+
+ /**
+ * @return true if should auto complete
+ */
+ boolean doAutoComplete();
+
+ /**
+ * Set auto complete.
+ * @param value to set
+ */
+ void setAutoComplete(final boolean value);
+
+ /**
+ * @return auto complete process request
+ */
+ ProcessRequest getCompletionProcessRequest();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/DefaultTaskReturnValueProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/DefaultTaskReturnValueProvider.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/DefaultTaskReturnValueProvider.java
new file mode 100644
index 0000000..d3a6be7
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/DefaultTaskReturnValueProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.mock.driver.runtime.MockRunningTask;
+
+import javax.inject.Inject;
+
+/**
+ * A default task return value provider.
+ */
+final class DefaultTaskReturnValueProvider implements MockTaskReturnValueProvider {
+
+ @Inject
+ DefaultTaskReturnValueProvider() {
+
+ }
+
+ @Override
+ public byte[] getReturnValue(final MockRunningTask task) {
+ return new byte[0];
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockConfiguration.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockConfiguration.java
new file mode 100644
index 0000000..3366220
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockConfiguration.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.restart.DriverRestartCompleted;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.mock.driver.runtime.MockClock;
+import org.apache.reef.mock.driver.runtime.MockEvaluatorRequestor;
+import org.apache.reef.mock.driver.runtime.MockRuntimeDriver;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalImpl;
+import org.apache.reef.tang.formats.RequiredImpl;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+/**
+ * Configure a mock runtime.
+ */
+@Unstable
+public class MockConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * The event handler invoked right after the driver boots up.
+ */
+ public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
+
+ /**
+ * The event handler invoked right before the driver shuts down. Defaults to ignore.
+ */
+ public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
+
+ // ***** EVALUATOR HANDLER BINDINGS:
+
+ /**
+ * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
+ */
+ public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed evaluators. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed evaluators. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
+
+ // ***** TASK HANDLER BINDINGS:
+
+ /**
+ * Event handler for task messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
+ */
+ public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
+
+ /**
+ * Event handler for failed tasks. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for running tasks. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
+ * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
+ */
+ public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
+
+ // ***** CONTEXT HANDLER BINDINGS:
+
+ /**
+ * Event handler for active context. Defaults to closing the context if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
+
+ /**
+ * Event handler for closed context. Defaults to job failure if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
+
+ /**
+ * Event handler for context messages. Defaults to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * This event is fired in place of the ON_DRIVER_STARTED when the Driver is in fact restarted after failure.
+ */
+ public static final OptionalImpl<EventHandler<DriverRestarted>> ON_DRIVER_RESTARTED = new OptionalImpl<>();
+
+ /**
+ * Event handler for running tasks in previous evaluator, when driver restarted. Defaults to crash if not bound.
+ */
+ public static final OptionalImpl<EventHandler<RunningTask>> ON_DRIVER_RESTART_TASK_RUNNING = new OptionalImpl<>();
+
+ /**
+ * Event handler for active context when driver restart. Defaults to closing the context if not bound.
+ */
+ public static final OptionalImpl<EventHandler<ActiveContext>> ON_DRIVER_RESTART_CONTEXT_ACTIVE = new OptionalImpl<>();
+
+ /**
+ * Event handler for the event of driver restart completion, default to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<DriverRestartCompleted>> ON_DRIVER_RESTART_COMPLETED =
+ new OptionalImpl<>();
+
+ /**
+ * Event handler for the event of driver restart completion, default to logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_DRIVER_RESTART_EVALUATOR_FAILED =
+ new OptionalImpl<>();
+
+ /**
+ * Receiver of messages sent by the Driver to the client.
+ */
+ public static final OptionalImpl<JobMessageObserver> ON_JOB_MESSAGE = new OptionalImpl<>();
+
+ /**
+ * An implementation of a task return value provider.
+ */
+ public static final OptionalImpl<MockTaskReturnValueProvider> TASK_RETURN_VALUE_PROVIDER = new OptionalImpl<>();
+
+ public static final ConfigurationModule CONF = new MockConfiguration()
+ .bindImplementation(EvaluatorRequestor.class, MockEvaluatorRequestor.class) // requesting evaluators
+ .bindImplementation(MockRuntime.class, MockRuntimeDriver.class)
+ .bindImplementation(MockFailure.class, MockRuntimeDriver.class)
+ .bindImplementation(Clock.class, MockClock.class)
+ .bindImplementation(MockTaskReturnValueProvider.class, TASK_RETURN_VALUE_PROVIDER)
+
+ // recovery handlers
+ .bindSetEntry(DriverRestartFailedEvaluatorHandlers.class, ON_DRIVER_RESTART_EVALUATOR_FAILED)
+ .bindSetEntry(DriverRestartCompletedHandlers.class, ON_DRIVER_RESTART_COMPLETED)
+ .bindSetEntry(DriverRestartContextActiveHandlers.class, ON_DRIVER_RESTART_CONTEXT_ACTIVE)
+ .bindSetEntry(DriverRestartTaskRunningHandlers.class, ON_DRIVER_RESTART_TASK_RUNNING)
+ .bindSetEntry(DriverRestartHandler.class, ON_DRIVER_RESTARTED)
+
+ // client handlers
+ .bindImplementation(JobMessageObserver.class, ON_JOB_MESSAGE) // sending message to job client
+
+ // Driver start/stop handlers
+ .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED)
+ .bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
+
+ // Evaluator handlers
+ .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
+ .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
+ .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
+
+ // Task handlers
+ .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING)
+ .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED)
+ .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE)
+ .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED)
+ .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
+
+ // Context handlers
+ .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
+ .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED)
+ .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
+ .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED)
+
+ .build();
+
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockDriverRestartContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockDriverRestartContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockDriverRestartContext.java
new file mode 100644
index 0000000..4c0ec28
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockDriverRestartContext.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.driver.restart.DriverRestartCompleted;
+import org.apache.reef.driver.restart.DriverRestarted;
+import org.apache.reef.mock.driver.runtime.MockActiveContext;
+import org.apache.reef.mock.driver.runtime.MockAllocatedEvaluator;
+import org.apache.reef.mock.driver.runtime.MockFailedEvaluator;
+import org.apache.reef.mock.driver.runtime.MockRunningTask;
+import org.apache.reef.wake.time.Time;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import java.util.*;
+
+/**
+ * Contains the runtime driver state at the time of a driver
+ * failure, triggered by {@link MockFailure}.
+ */
+public final class MockDriverRestartContext {
+
+ private final int restartAttemps;
+
+ private final StartTime startTime;
+
+ private final List<MockAllocatedEvaluator> allocatedEvaluators;
+
+ private final List<MockActiveContext> activeContexts;
+
+ private final List<MockRunningTask> runningTasks;
+
+ private final List<MockFailedEvaluator> failedEvaluators;
+
+ public MockDriverRestartContext(
+ final int restartAttemps,
+ final StartTime startTime,
+ final List<MockAllocatedEvaluator> allocatedEvaluators,
+ final List<MockActiveContext> activeContexts,
+ final List<MockRunningTask> runningTasks) {
+ this.restartAttemps = restartAttemps;
+ this.startTime = startTime;
+ this.allocatedEvaluators = allocatedEvaluators;
+ this.activeContexts = activeContexts;
+ this.runningTasks = runningTasks;
+ this.failedEvaluators = new ArrayList<>();
+ }
+
+ /**
+ * Generate a DriverRestarted event to be passed to the
+ * {@link org.apache.reef.driver.parameters.DriverRestartHandler}.
+ * @return DriverRestarted event based on the state at the time of driver failure
+ */
+ public DriverRestarted getDriverRestarted() {
+ final Set<String> expectedEvaluatorIds = new HashSet<>();
+ for (final MockAllocatedEvaluator allocatedEvaluator : this.allocatedEvaluators) {
+ expectedEvaluatorIds.add(allocatedEvaluator.getId());
+ }
+ return new DriverRestarted() {
+ @Override
+ public int getResubmissionAttempts() {
+ return restartAttemps;
+ }
+
+ @Override
+ public StartTime getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public Set<String> getExpectedEvaluatorIds() {
+ return expectedEvaluatorIds;
+ }
+ };
+ }
+
+ public DriverRestartCompleted getDriverRestartCompleted(final boolean isTimeout, final long restartDuration) {
+ return new DriverRestartCompleted() {
+ @Override
+ public Time getCompletedTime() {
+ return new StopTime(startTime.getTimestamp() + restartDuration);
+ }
+
+ @Override
+ public boolean isTimedOut() {
+ return isTimeout;
+ }
+ };
+ }
+
+ /**
+ * Pass these tasks to the {@link org.apache.reef.driver.parameters.DriverRestartTaskRunningHandlers}.
+ * @return MockRunningTasks at the time of driver failure
+ */
+ public List<MockRunningTask> getRunningTasks() {
+ return this.runningTasks;
+ }
+
+ /**
+ * Pass these active contexts to the {@link org.apache.reef.driver.parameters.DriverRestartContextActiveHandlers}.
+ * These active contexts have no tasks running.
+ * @return
+ */
+ public List<MockActiveContext> getIdleActiveContexts() {
+ final List<MockActiveContext> idleActiveContexts = new ArrayList<>();
+ final Set<String> activeContextsWithRunningTasks = new HashSet<>();
+ for (final MockRunningTask task : this.runningTasks) {
+ activeContextsWithRunningTasks.add(task.getActiveContext().getEvaluatorId());
+ }
+ for (final MockActiveContext context : this.activeContexts) {
+ if (!activeContextsWithRunningTasks.contains(context.getEvaluatorId())) {
+ idleActiveContexts.add(context);
+ }
+ }
+ return idleActiveContexts;
+ }
+
+ public List<MockFailedEvaluator> getFailedEvaluators() {
+ return this.failedEvaluators;
+ }
+
+ /**
+ * Fail a task.
+ * @param task to fail
+ */
+ public void failTask(final MockRunningTask task) {
+ this.runningTasks.remove(task);
+ }
+
+ /**
+ * Fail an evaluator; automatically cleans up state i.e., running tasks and contexts
+ * pertaining to the evaluator, and adds the evaluator to {@link this#getFailedEvaluators()}, which
+ * can be passed to the {@link org.apache.reef.driver.parameters.DriverRestartFailedEvaluatorHandlers}.
+ * @param evaluator to fail
+ */
+ public void failEvaluator(final MockAllocatedEvaluator evaluator) {
+ if (this.allocatedEvaluators.remove(evaluator)) {
+ this.failedEvaluators.add(new MockFailedEvaluator(evaluator.getId()));
+ // cleanup
+ final Iterator<MockRunningTask> taskIter = this.runningTasks.iterator();
+ while (taskIter.hasNext()) {
+ final MockRunningTask task = taskIter.next();
+ if (task.evaluatorID().equals(evaluator.getId())) {
+ taskIter.remove();
+ }
+ }
+ final Iterator<MockActiveContext> contextIter = this.activeContexts.iterator();
+ while (contextIter.hasNext()) {
+ final MockActiveContext context = contextIter.next();
+ if (context.getEvaluatorId().equals(evaluator.getId())) {
+ contextIter.remove();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockFailure.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockFailure.java
new file mode 100644
index 0000000..09ad186
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockFailure.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.wake.time.event.StartTime;
+
+import java.util.Collection;
+
+/**
+ * Used to fail running REEF entities i.e., Evaluators, Contexts, Tasks.
+ */
+@Unstable
+public interface MockFailure {
+
+ /**
+ * @return current Collection of allocated evaluators.
+ */
+ Collection<AllocatedEvaluator> getCurrentAllocatedEvaluators();
+
+ /**
+ * Fail an allocated evaluator.
+ * @param evaluator to be failed
+ */
+ void fail(final AllocatedEvaluator evaluator);
+
+ /**
+ * @return current Collection of active contexts
+ */
+ Collection<ActiveContext> getCurrentActiveContexts();
+
+ /**
+ * Fail an ActiveContext.
+ * @param context to be failed
+ */
+ void fail(final ActiveContext context);
+
+ /**
+ * @return current Collection of running tasks
+ */
+ Collection<RunningTask> getCurrentRunningTasks();
+
+ /**
+ * Fail a running task.
+ * @param task to be failed
+ */
+ void fail(final RunningTask task);
+
+ /**
+ * Fail the driver.
+ * @return the state of the driver at the time of the failure
+ */
+ MockDriverRestartContext failDriver(final int attempt, final StartTime startTime);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockRuntime.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockRuntime.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockRuntime.java
new file mode 100644
index 0000000..a110ef7
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockRuntime.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.restart.DriverRestartCompleted;
+
+/**
+ * Mock API used to drive the evaluation of ProcessRequest
+ * events, which are triggered by the Application driver.
+ * Clients used this to determine whether a particular ProcessRequest
+ * event should succeed or fail.
+ */
+@Unstable
+public interface MockRuntime extends MockFailure {
+
+ /**
+ * Initiate the start time event to the application driver.
+ */
+ void start();
+
+ /**
+ * Initiate the stop time event to the application driver.
+ */
+ void stop();
+
+ /**
+ * Initiate a driver restart.
+ * @param restartContext contains the state of the driver at the time of failure
+ * @param isTimeout used to fill in {@link DriverRestartCompleted#isTimedOut()}
+ * @param duration recover time duration (added to start time)
+ */
+ void restart(final MockDriverRestartContext restartContext, final boolean isTimeout, final long duration);
+
+ /**
+ * @return true if there is an outstanding ProcessRequest
+ */
+ boolean hasProcessRequest();
+
+ /**
+ * The client (caller) is responsible for determining what
+ * to do with a ProcessRequest event. There are three options:
+ * 1. Pass to the succeed method, which signals success to the driver.
+ * 2. Pass to the fail method, signaling failure to the driver.
+ * 3. Drop it on the floor (e.g., network failure).
+ *
+ * @return the next ProcessRequest object to be processed.
+ */
+ ProcessRequest getNextProcessRequest();
+
+ /**
+ * The driver will be informed that the operation corresponding
+ * to the ProcessRequest succeeded, and will be given any relevant
+ * data structures e.g., AllocatedEvaluator, RunningTask, etc.
+ *
+ * @param request to be processed successfully
+ */
+ void succeed(final ProcessRequest request);
+
+ /**
+ * The driver will be informed that the operation corresponding
+ * to the PRocessRequest failed, and will be given any relevant
+ * data structures e.g., FailedEvaluator, FailedTask, etc.
+ *
+ * @param request to be failed.
+ */
+ void fail(final ProcessRequest request);
+
+ /**
+ * Publish a context message to the application event handlers.
+ * @param contextMessage to be published
+ */
+ void publish(final ContextMessage contextMessage);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockTaskReturnValueProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockTaskReturnValueProvider.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockTaskReturnValueProvider.java
new file mode 100644
index 0000000..6972a18
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/MockTaskReturnValueProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.mock.driver.runtime.MockRunningTask;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * Clients bind an implementation of this interface, which
+ * will be used to create a mock return value for a mock
+ * task execution. This return value will be returned by
+ * the {@link CompletedTask#get()}} method.
+ */
+@Unstable
+@DefaultImplementation(DefaultTaskReturnValueProvider.class)
+public interface MockTaskReturnValueProvider {
+
+ /**
+ * Provide a valid return value for the {@link CompletedTask#get()} method.
+ * @param task that is to be provided with a return value
+ * @return {@link org.apache.reef.task.Task#call(byte[])} return value
+ */
+ byte[] getReturnValue(final MockRunningTask task);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/ProcessRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/ProcessRequest.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/ProcessRequest.java
new file mode 100644
index 0000000..cf4429e
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/ProcessRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * A ProcessRequest refers to an outstanding event that is
+ * waiting to be processed by the REEF mock runtime. Clients
+ * are responsible for deciding how a ProcessRequest should be
+ * handled, by either:
+ * 1. successfully processing the request
+ * 2. unsucessfully processing the request
+ * 3. dropping the processing request (i.e., loosing it)
+ * These decisions are conveyed through the {MockRuntime} API.
+ */
+@Unstable
+public interface ProcessRequest extends AutoCompletable {
+ /**
+ * process request type.
+ */
+ enum Type {
+ ALLOCATE_EVALUATOR,
+ CLOSE_EVALUATOR,
+ CREATE_CONTEXT,
+ CLOSE_CONTEXT,
+ CREATE_TASK,
+ SUSPEND_TASK,
+ CLOSE_TASK,
+ COMPLETE_TASK,
+ CREATE_CONTEXT_AND_TASK,
+ SEND_MESSAGE_DRIVER_TO_TASK,
+ SEND_MESSAGE_DRIVER_TO_CONTEXT
+ }
+
+ Type getType();
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/package-info.java
new file mode 100644
index 0000000..f9d73f5
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/package-info.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+/**
+ * Mock runtime API.
+ *
+ * Mock runtime is meant to mimic the semantics of the REEF runtime and
+ * allow:
+ * 1. Applications to driver the forward progress of processing REEF events.
+ * See {@link org.apache.reef.mock.driver.MockRuntime} API
+ * 2. Control the advancement of the Clock and Alarm callbacks.
+ * See {@link org.apache.reef.mock.driver.runtime.MockClock}
+ * 3. Inject failures into the REEF applications.
+ * See {@link org.apache.reef.mock.driver.MockFailure}
+ *
+ * Use {@link org.apache.reef.mock.driver.MockConfiguration} to bind your REEF
+ * driver application event handlers.
+ *
+ * Use {@link org.apache.reef.mock.driver.MockRuntime#start()} to trigger the
+ * driver start event and {@link org.apache.reef.mock.driver.MockRuntime#stop()}}
+ * or {@link org.apache.reef.mock.driver.runtime.MockClock#close()} to trigger the driver
+ * stop event.
+ */
+package org.apache.reef.mock.driver;
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/AllocateEvaluator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/AllocateEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/AllocateEvaluator.java
new file mode 100644
index 0000000..d717fc6
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/AllocateEvaluator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockAllocatedEvaluator;
+import org.apache.reef.mock.driver.runtime.MockFailedEvaluator;
+
+/**
+ * Allocate Evaluator process request.
+ */
+@Unstable
+@Private
+public final class AllocateEvaluator implements
+ ProcessRequestInternal<MockAllocatedEvaluator, FailedEvaluator> {
+
+ private final MockAllocatedEvaluator evaluator;
+
+ public AllocateEvaluator(final MockAllocatedEvaluator evaluator) {
+ this.evaluator = evaluator;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.ALLOCATE_EVALUATOR;
+ }
+
+ @Override
+ public MockAllocatedEvaluator getSuccessEvent() {
+ return this.evaluator;
+ }
+
+ @Override
+ public FailedEvaluator getFailureEvent() {
+ return new MockFailedEvaluator(evaluator.getId());
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return false;
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseContext.java
new file mode 100644
index 0000000..d6408d7
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.mock.driver.AutoCompletable;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockActiveContext;
+import org.apache.reef.mock.driver.runtime.MockClosedContext;
+import org.apache.reef.mock.driver.runtime.MockFailedContext;
+
+/**
+ * close context process request.
+ */
+@Unstable
+@Private
+public final class CloseContext implements
+ ProcessRequestInternal<ClosedContext, FailedContext>,
+ AutoCompletable {
+
+ private final MockActiveContext context;
+
+ public CloseContext(final MockActiveContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.CLOSE_CONTEXT;
+ }
+
+ @Override
+ public MockClosedContext getSuccessEvent() {
+ return new MockClosedContext(this.context);
+ }
+
+ @Override
+ public FailedContext getFailureEvent() {
+ return new MockFailedContext(this.context);
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return !this.context.getParentContext().isPresent();
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ return new CloseEvaluator(this.context.getEvaluator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseEvaluator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseEvaluator.java
new file mode 100644
index 0000000..c329873
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseEvaluator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockAllocatedEvaluator;
+import org.apache.reef.mock.driver.runtime.MockFailedEvaluator;
+
+/**
+ * close evaluator request.
+ */
+@Unstable
+@Private
+public final class CloseEvaluator implements ProcessRequestInternal<CompletedEvaluator, FailedEvaluator> {
+
+ private final MockAllocatedEvaluator evaluator;
+
+ public CloseEvaluator(final MockAllocatedEvaluator evaluator) {
+ this.evaluator = evaluator;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.CLOSE_EVALUATOR;
+ }
+
+ @Override
+ public CompletedEvaluator getSuccessEvent() {
+ return new CompletedEvaluator() {
+ @Override
+ public String getId() {
+ return evaluator.getId();
+ }
+ };
+ }
+
+ @Override
+ public FailedEvaluator getFailureEvent() {
+ // TODO[initialize remaining failed contstructer fields]
+ return new MockFailedEvaluator(evaluator.getId());
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return false;
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseTask.java
new file mode 100644
index 0000000..c726d94
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CloseTask.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.mock.driver.MockTaskReturnValueProvider;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockCompletedTask;
+import org.apache.reef.mock.driver.runtime.MockRunningTask;
+import org.apache.reef.util.Optional;
+
+/**
+ * close task process request.
+ */
+@Unstable
+@Private
+public final class CloseTask implements ProcessRequestInternal<CompletedTask, FailedTask> {
+
+ private final MockRunningTask task;
+
+ private final MockTaskReturnValueProvider taskReturnValueProvider;
+
+ public CloseTask(
+ final MockRunningTask task,
+ final MockTaskReturnValueProvider taskReturnValueProvider) {
+ this.task = task;
+ this.taskReturnValueProvider = taskReturnValueProvider;
+ }
+
+ public MockRunningTask getTask() {
+ return task;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.CLOSE_TASK;
+ }
+
+ @Override
+ public MockCompletedTask getSuccessEvent() {
+ return new MockCompletedTask(this.task, this.taskReturnValueProvider.getReturnValue(task));
+ }
+
+ @Override
+ public FailedTask getFailureEvent() {
+ return new FailedTask(
+ task.getId(),
+ "mock",
+ Optional.<String>empty(),
+ Optional.<Throwable>empty(),
+ Optional.<byte[]>empty(),
+ Optional.of(this.task.getActiveContext()));
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return false;
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CompleteTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CompleteTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CompleteTask.java
new file mode 100644
index 0000000..c38c10b
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CompleteTask.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.mock.driver.MockTaskReturnValueProvider;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockCompletedTask;
+import org.apache.reef.mock.driver.runtime.MockRunningTask;
+
+/**
+ * close task process request.
+ */
+@Unstable
+@Private
+public final class CompleteTask implements ProcessRequestInternal<CompletedTask, FailedTask> {
+
+ private final MockRunningTask task;
+
+ private final MockTaskReturnValueProvider returnValueProvider;
+
+ public CompleteTask(
+ final MockRunningTask task,
+ final MockTaskReturnValueProvider returnValueProvider) {
+ this.task = task;
+ this.returnValueProvider = returnValueProvider;
+ }
+
+ public MockRunningTask getTask() {
+ return task;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.COMPLETE_TASK;
+ }
+
+ @Override
+ public CompletedTask getSuccessEvent() {
+ return new MockCompletedTask(this.task, this.returnValueProvider.getReturnValue(task));
+ }
+
+ @Override
+ public FailedTask getFailureEvent() {
+ return null;
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return false;
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateContext.java
new file mode 100644
index 0000000..138d070
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.mock.driver.AutoCompletable;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockActiveContext;
+import org.apache.reef.mock.driver.runtime.MockFailedContext;
+
+/**
+ * create context process request.
+ */
+@Unstable
+@Private
+public final class CreateContext implements
+ ProcessRequestInternal<MockActiveContext, FailedContext>,
+ AutoCompletable {
+
+ private final MockActiveContext context;
+
+ private boolean autoComplete = false;
+
+ public CreateContext(final MockActiveContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.CREATE_CONTEXT;
+ }
+
+ @Override
+ public MockActiveContext getSuccessEvent() {
+ return this.context;
+ }
+
+ @Override
+ public FailedContext getFailureEvent() {
+ return new MockFailedContext(this.context);
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return this.autoComplete;
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ this.autoComplete = value;
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ return new CloseContext(this.context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateContextAndTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateContextAndTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateContextAndTask.java
new file mode 100644
index 0000000..7d724a0
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateContextAndTask.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.mock.driver.AutoCompletable;
+import org.apache.reef.mock.driver.MockTaskReturnValueProvider;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockActiveContext;
+import org.apache.reef.mock.driver.runtime.MockFailedContext;
+import org.apache.reef.mock.driver.runtime.MockRunningTask;
+import org.apache.reef.util.Optional;
+
+/**
+ * create context and task process request.
+ */
+@Unstable
+@Private
+public final class CreateContextAndTask implements
+ ProcessRequestInternal<Tuple<MockActiveContext, MockRunningTask>, Tuple<MockFailedContext, FailedTask>>,
+ AutoCompletable {
+
+ private final MockActiveContext context;
+
+ private final MockRunningTask task;
+
+ private final MockTaskReturnValueProvider taskReturnValueProvider;
+
+ private boolean autoComplete = true;
+
+ public CreateContextAndTask(
+ final MockActiveContext context,
+ final MockRunningTask task,
+ final MockTaskReturnValueProvider taskReturnValueProvider) {
+ this.context = context;
+ this.task = task;
+ this.taskReturnValueProvider = taskReturnValueProvider;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.CREATE_CONTEXT_AND_TASK;
+ }
+
+ @Override
+ public Tuple<MockActiveContext, MockRunningTask> getSuccessEvent() {
+ return new Tuple<>(this.context, this.task);
+ }
+
+ @Override
+ public Tuple<MockFailedContext, FailedTask> getFailureEvent() {
+ return new Tuple<>(
+ new MockFailedContext(this.context),
+ new FailedTask(
+ this.task.getId(),
+ "mock",
+ Optional.<String>empty(),
+ Optional.<Throwable>empty(),
+ Optional.<byte[]>empty(),
+ Optional.of((ActiveContext)this.context)));
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return this.autoComplete;
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ this.autoComplete = value;
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ return new CompleteTask(this.task, this.taskReturnValueProvider);
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateTask.java
new file mode 100644
index 0000000..699b0ae
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/CreateTask.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.mock.driver.AutoCompletable;
+import org.apache.reef.mock.driver.MockTaskReturnValueProvider;
+import org.apache.reef.mock.driver.ProcessRequest;
+import org.apache.reef.mock.driver.runtime.MockRunningTask;
+import org.apache.reef.util.Optional;
+
+/**
+ * create task process request.
+ */
+@Unstable
+@Private
+public final class CreateTask implements
+ ProcessRequestInternal<RunningTask, FailedTask>,
+ AutoCompletable {
+
+ private final MockRunningTask task;
+
+ private final MockTaskReturnValueProvider returnValueProvider;
+
+ private boolean autoComplete = true;
+
+ public CreateTask(
+ final MockRunningTask task,
+ final MockTaskReturnValueProvider returnValueProvider) {
+ this.task = task;
+ this.returnValueProvider = returnValueProvider;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.CREATE_TASK;
+ }
+
+ @Override
+ public boolean doAutoComplete() {
+ return this.autoComplete;
+ }
+
+ @Override
+ public ProcessRequest getCompletionProcessRequest() {
+ return new CompleteTask(this.task, this.returnValueProvider);
+ }
+
+ @Override
+ public void setAutoComplete(final boolean value) {
+ this.autoComplete = value;
+ }
+
+ @Override
+ public MockRunningTask getSuccessEvent() {
+ return this.task;
+ }
+
+ @Override
+ public FailedTask getFailureEvent() {
+ return new FailedTask(
+ this.task.getId(),
+ "mock",
+ Optional.<String>empty(),
+ Optional.<Throwable>empty(),
+ Optional.<byte[]>empty(),
+ Optional.of(this.task.getActiveContext()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/ProcessRequestInternal.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/ProcessRequestInternal.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/ProcessRequestInternal.java
new file mode 100644
index 0000000..d8509d6
--- /dev/null
+++ b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/driver/request/ProcessRequestInternal.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.reef.mock.driver.request;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.mock.driver.ProcessRequest;
+
+/**
+ * internal process request API.
+ * @param <S> successful event
+ * @param <F> failure event
+ */
+@Unstable
+@Private
+public interface ProcessRequestInternal<S, F> extends ProcessRequest {
+
+ /**
+ * @return the outcome of a successful processing of this request
+ */
+ S getSuccessEvent();
+
+ /**
+ * @return the outcome of an unsuccessful processing of this request
+ */
+ F getFailureEvent();
+}
[2/4] reef git commit: [REEF-2012] Add driver restart capabilities to
reef runtime mock
Posted by mo...@apache.org.
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseTask.java
deleted file mode 100644
index 504161e..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CloseTask.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.task.CompletedTask;
-import org.apache.reef.driver.task.FailedTask;
-import org.apache.reef.mock.MockTaskReturnValueProvider;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockCompletedTask;
-import org.apache.reef.mock.runtime.MockRunningTask;
-import org.apache.reef.util.Optional;
-
-/**
- * close task process request.
- */
-@Unstable
-@Private
-public final class CloseTask implements ProcessRequestInternal<CompletedTask, FailedTask> {
-
- private final MockRunningTask task;
-
- private final MockTaskReturnValueProvider taskReturnValueProvider;
-
- public CloseTask(
- final MockRunningTask task,
- final MockTaskReturnValueProvider taskReturnValueProvider) {
- this.task = task;
- this.taskReturnValueProvider = taskReturnValueProvider;
- }
-
- public MockRunningTask getTask() {
- return task;
- }
-
- @Override
- public Type getType() {
- return Type.CLOSE_TASK;
- }
-
- @Override
- public MockCompletedTask getSuccessEvent() {
- return new MockCompletedTask(this.task, this.taskReturnValueProvider.getReturnValue(task));
- }
-
- @Override
- public FailedTask getFailureEvent() {
- return new FailedTask(
- task.getId(),
- "mock",
- Optional.<String>empty(),
- Optional.<Throwable>empty(),
- Optional.<byte[]>empty(),
- Optional.of(this.task.getActiveContext()));
- }
-
- @Override
- public boolean doAutoComplete() {
- return false;
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CompleteTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CompleteTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CompleteTask.java
deleted file mode 100644
index 25e6df8..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CompleteTask.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.task.CompletedTask;
-import org.apache.reef.driver.task.FailedTask;
-import org.apache.reef.mock.MockTaskReturnValueProvider;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockCompletedTask;
-import org.apache.reef.mock.runtime.MockRunningTask;
-
-/**
- * close task process request.
- */
-@Unstable
-@Private
-public final class CompleteTask implements ProcessRequestInternal<CompletedTask, FailedTask> {
-
- private final MockRunningTask task;
-
- private final MockTaskReturnValueProvider returnValueProvider;
-
- public CompleteTask(
- final MockRunningTask task,
- final MockTaskReturnValueProvider returnValueProvider) {
- this.task = task;
- this.returnValueProvider = returnValueProvider;
- }
-
- public MockRunningTask getTask() {
- return task;
- }
-
- @Override
- public Type getType() {
- return Type.COMPLETE_TASK;
- }
-
- @Override
- public CompletedTask getSuccessEvent() {
- return new MockCompletedTask(this.task, this.returnValueProvider.getReturnValue(task));
- }
-
- @Override
- public FailedTask getFailureEvent() {
- return null;
- }
-
- @Override
- public boolean doAutoComplete() {
- return false;
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateContext.java
deleted file mode 100644
index e9d533b..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.mock.AutoCompletable;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockActiveContext;
-import org.apache.reef.mock.runtime.MockFailedContext;
-
-/**
- * create context process request.
- */
-@Unstable
-@Private
-public final class CreateContext implements
- ProcessRequestInternal<MockActiveContext, FailedContext>,
- AutoCompletable {
-
- private final MockActiveContext context;
-
- private boolean autoComplete = false;
-
- public CreateContext(final MockActiveContext context) {
- this.context = context;
- }
-
- @Override
- public Type getType() {
- return Type.CREATE_CONTEXT;
- }
-
- @Override
- public MockActiveContext getSuccessEvent() {
- return this.context;
- }
-
- @Override
- public FailedContext getFailureEvent() {
- return new MockFailedContext(this.context);
- }
-
- @Override
- public boolean doAutoComplete() {
- return this.autoComplete;
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- this.autoComplete = value;
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- return new CloseContext(this.context);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateContextAndTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateContextAndTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateContextAndTask.java
deleted file mode 100644
index 2169bfd..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateContextAndTask.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.task.FailedTask;
-import org.apache.reef.io.Tuple;
-import org.apache.reef.mock.AutoCompletable;
-import org.apache.reef.mock.MockTaskReturnValueProvider;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockActiveContext;
-import org.apache.reef.mock.runtime.MockFailedContext;
-import org.apache.reef.mock.runtime.MockRunningTask;
-import org.apache.reef.util.Optional;
-
-/**
- * create context and task process request.
- */
-@Unstable
-@Private
-public final class CreateContextAndTask implements
- ProcessRequestInternal<Tuple<MockActiveContext, MockRunningTask>, Tuple<MockFailedContext, FailedTask>>,
- AutoCompletable {
-
- private final MockActiveContext context;
-
- private final MockRunningTask task;
-
- private final MockTaskReturnValueProvider taskReturnValueProvider;
-
- private boolean autoComplete = true;
-
- public CreateContextAndTask(
- final MockActiveContext context,
- final MockRunningTask task,
- final MockTaskReturnValueProvider taskReturnValueProvider) {
- this.context = context;
- this.task = task;
- this.taskReturnValueProvider = taskReturnValueProvider;
- }
-
- @Override
- public Type getType() {
- return Type.CREATE_CONTEXT_AND_TASK;
- }
-
- @Override
- public Tuple<MockActiveContext, MockRunningTask> getSuccessEvent() {
- return new Tuple<>(this.context, this.task);
- }
-
- @Override
- public Tuple<MockFailedContext, FailedTask> getFailureEvent() {
- return new Tuple<>(
- new MockFailedContext(this.context),
- new FailedTask(
- this.task.getId(),
- "mock",
- Optional.<String>empty(),
- Optional.<Throwable>empty(),
- Optional.<byte[]>empty(),
- Optional.of((ActiveContext)this.context)));
- }
-
- @Override
- public boolean doAutoComplete() {
- return this.autoComplete;
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- this.autoComplete = value;
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- return new CompleteTask(this.task, this.taskReturnValueProvider);
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateTask.java
deleted file mode 100644
index a5eed49..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/CreateTask.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.task.FailedTask;
-import org.apache.reef.driver.task.RunningTask;
-import org.apache.reef.mock.AutoCompletable;
-import org.apache.reef.mock.MockTaskReturnValueProvider;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockRunningTask;
-import org.apache.reef.util.Optional;
-
-/**
- * create task process request.
- */
-@Unstable
-@Private
-public final class CreateTask implements
- ProcessRequestInternal<RunningTask, FailedTask>,
- AutoCompletable {
-
- private final MockRunningTask task;
-
- private final MockTaskReturnValueProvider returnValueProvider;
-
- private boolean autoComplete = true;
-
- public CreateTask(
- final MockRunningTask task,
- final MockTaskReturnValueProvider returnValueProvider) {
- this.task = task;
- this.returnValueProvider = returnValueProvider;
- }
-
- @Override
- public Type getType() {
- return Type.CREATE_TASK;
- }
-
- @Override
- public boolean doAutoComplete() {
- return this.autoComplete;
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- return new CompleteTask(this.task, this.returnValueProvider);
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- this.autoComplete = value;
- }
-
- @Override
- public MockRunningTask getSuccessEvent() {
- return this.task;
- }
-
- @Override
- public FailedTask getFailureEvent() {
- return new FailedTask(
- this.task.getId(),
- "mock",
- Optional.<String>empty(),
- Optional.<Throwable>empty(),
- Optional.<byte[]>empty(),
- Optional.of(this.task.getActiveContext()));
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/ProcessRequestInternal.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/ProcessRequestInternal.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/ProcessRequestInternal.java
deleted file mode 100644
index b9dcd2b..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/ProcessRequestInternal.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.mock.ProcessRequest;
-
-/**
- * internal process request API.
- * @param <S> successful event
- * @param <F> failure event
- */
-@Unstable
-@Private
-public interface ProcessRequestInternal<S, F> extends ProcessRequest {
-
- /**
- * @return the outcome of a successful processing of this request
- */
- S getSuccessEvent();
-
- /**
- * @return the outcome of an unsuccessful processing of this request
- */
- F getFailureEvent();
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SendMessageDriverToContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SendMessageDriverToContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SendMessageDriverToContext.java
deleted file mode 100644
index 74faa60..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SendMessageDriverToContext.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.mock.ProcessRequest;
-
-/**
- * send message from driver to context process request.
- */
-@Unstable
-@Private
-public final class SendMessageDriverToContext implements
- ProcessRequestInternal<Object, Object> {
-
- private final ActiveContext context;
-
- private final byte[] message;
-
- public SendMessageDriverToContext(final ActiveContext context, final byte[] message) {
- this.context = context;
- this.message = message;
- }
-
- @Override
- public Type getType() {
- return Type.SEND_MESSAGE_DRIVER_TO_CONTEXT;
- }
-
- public ActiveContext getContext() {
- return this.context;
- }
-
- public byte[] getMessage() {
- return this.message;
- }
-
- @Override
- public Object getSuccessEvent() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object getFailureEvent() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean doAutoComplete() {
- return false;
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SendMessageDriverToTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SendMessageDriverToTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SendMessageDriverToTask.java
deleted file mode 100644
index d6dfaaa..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SendMessageDriverToTask.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.task.RunningTask;
-import org.apache.reef.mock.ProcessRequest;
-
-/**
- * send message from driver to task process request.
- */
-@Unstable
-@Private
-public final class SendMessageDriverToTask implements
- ProcessRequestInternal<Object, Object> {
-
- private RunningTask task;
-
- private final byte[] message;
-
- public SendMessageDriverToTask(final RunningTask task, final byte[] message) {
- this.task = task;
- this.message = message;
- }
-
- @Override
- public Type getType() {
- return Type.SEND_MESSAGE_DRIVER_TO_TASK;
- }
-
- public RunningTask getTask() {
- return task;
- }
-
- public byte[] getMessage() {
- return message;
- }
-
- @Override
- public Object getSuccessEvent() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object getFailureEvent() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean doAutoComplete() {
- return false;
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SuspendTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SuspendTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SuspendTask.java
deleted file mode 100644
index 74a28f6..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/SuspendTask.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.request;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.task.FailedTask;
-import org.apache.reef.driver.task.SuspendedTask;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.runtime.MockRunningTask;
-import org.apache.reef.mock.runtime.MockSuspendedTask;
-import org.apache.reef.util.Optional;
-
-/**
- * suspend task process request.
- */
-@Unstable
-@Private
-public final class SuspendTask implements ProcessRequestInternal<SuspendedTask, FailedTask> {
-
- private final MockRunningTask task;
-
- private final Optional<byte[]> message;
-
- public SuspendTask(final MockRunningTask task, final Optional<byte[]> message) {
- this.task = task;
- this.message = message;
- }
-
- public MockRunningTask getTask() {
- return task;
- }
-
- @Override
- public Type getType() {
- return Type.SUSPEND_TASK;
- }
-
- public Optional<byte[]> getMessage() {
- return message;
- }
-
- @Override
- public MockSuspendedTask getSuccessEvent() {
- return new MockSuspendedTask(this.task);
- }
-
- @Override
- public FailedTask getFailureEvent() {
- return new FailedTask(
- this.task.getId(),
- "mock",
- Optional.<String>empty(),
- Optional.<Throwable>empty(),
- Optional.<byte[]>empty(),
- Optional.of(this.task.getActiveContext()));
- }
-
- @Override
- public boolean doAutoComplete() {
- return false;
- }
-
- @Override
- public void setAutoComplete(final boolean value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ProcessRequest getCompletionProcessRequest() {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/package-info.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/package-info.java
deleted file mode 100644
index 56bc6f0..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/request/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-/**
- * process request implementations.
- */
-package org.apache.reef.mock.request;
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockActiveContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockActiveContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockActiveContext.java
deleted file mode 100644
index ebde788..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockActiveContext.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
-import org.apache.reef.driver.task.TaskConfigurationOptions;
-import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
-import org.apache.reef.mock.request.CloseContext;
-import org.apache.reef.mock.request.CreateContext;
-import org.apache.reef.mock.request.CreateTask;
-import org.apache.reef.mock.request.SendMessageDriverToContext;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.util.Optional;
-
-/**
- * mock active context.
- */
-@Unstable
-@Private
-public final class MockActiveContext implements ActiveContext {
-
- private final MockRuntimeDriver mockRuntimeDriver;
-
- private final MockAllocatedEvalautor evaluator;
-
- private final Optional<MockActiveContext> parentContext;
-
- private final String contextID;
-
- MockActiveContext(
- final MockRuntimeDriver mockRuntimeDriver,
- final MockAllocatedEvalautor evalautor,
- final Optional<MockActiveContext> parentContext,
- final String contextID) {
- this.mockRuntimeDriver = mockRuntimeDriver;
- this.evaluator = evalautor;
- this.parentContext = parentContext;
- this.contextID = contextID;
- }
-
- @Override
- public int hashCode() {
- final String id = this.getEvaluatorId() + ":" + contextID;
- return id.hashCode();
- }
-
- public boolean equals(final Object that) {
- if (that instanceof MockActiveContext) {
- return this.getEvaluatorId().equals(((MockActiveContext)that).getEvaluatorId()) &&
- this.contextID.equals(((MockActiveContext)that).contextID);
- }
- return false;
- }
-
- public MockAllocatedEvalautor getEvaluator() {
- return this.evaluator;
- }
-
- public Optional<MockActiveContext> getParentContext() {
- return this.parentContext;
- }
-
- @Override
- public void close() {
- this.mockRuntimeDriver.add(new CloseContext(this));
- }
-
- @Override
- public void submitTask(final Configuration taskConf) {
- final String taskID = MockUtils.getValue(taskConf, TaskConfigurationOptions.Identifier.class);
- final MockRunningTask task = new MockRunningTask(this.mockRuntimeDriver, taskID, this);
- this.mockRuntimeDriver.add(new CreateTask(task, this.mockRuntimeDriver.getTaskReturnValueProvider()));
- }
-
- @Override
- public void submitContext(final Configuration contextConfiguration) {
- final String childContextID = MockUtils.getValue(contextConfiguration, ContextIdentifier.class);
- final MockActiveContext context = new MockActiveContext(
- this.mockRuntimeDriver,
- this.evaluator,
- Optional.of(this),
- childContextID);
- this.mockRuntimeDriver.add(new CreateContext(context));
- }
-
- @Override
- public void submitContextAndService(
- final Configuration contextConfiguration,
- final Configuration serviceConfiguration) {
- submitContext(contextConfiguration);
- }
-
- @Override
- public void sendMessage(final byte[] message) {
- this.mockRuntimeDriver.add(new SendMessageDriverToContext(this, message));
- }
-
- @Override
- public String getEvaluatorId() {
- return this.evaluator.getId();
- }
-
- @Override
- public Optional<String> getParentId() {
- return this.parentContext.isPresent() ?
- Optional.of(this.parentContext.get().getId()) :
- Optional.<String>empty();
- }
-
- @Override
- public EvaluatorDescriptor getEvaluatorDescriptor() {
- return this.evaluator.getEvaluatorDescriptor();
- }
-
- @Override
- public String getId() {
- return this.contextID;
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockAllocatedEvalautor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockAllocatedEvalautor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockAllocatedEvalautor.java
deleted file mode 100644
index b08d557..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockAllocatedEvalautor.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
-import org.apache.reef.driver.evaluator.EvaluatorProcess;
-import org.apache.reef.driver.task.TaskConfigurationOptions;
-import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
-import org.apache.reef.mock.request.CloseEvaluator;
-import org.apache.reef.mock.request.CreateContextAndTask;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.util.Optional;
-
-import java.io.File;
-
-/**
- * mock allocated evaluator.
- */
-@Unstable
-@Private
-public final class MockAllocatedEvalautor implements AllocatedEvaluator {
- public static final String ROOT_CONTEXT_IDENTIFIER_PREFIX = "ROOT.CONTEXT.";
-
- private final MockRuntimeDriver mockRuntimeDriver;
-
- private final String identifier;
-
- private final EvaluatorDescriptor evaluatorDescriptor;
-
- private final MockActiveContext rootContext;
-
- private boolean closed = false;
-
- MockAllocatedEvalautor(
- final MockRuntimeDriver mockRuntimeDriver,
- final String identifier,
- final EvaluatorDescriptor evaluatorDescriptor) {
- this.mockRuntimeDriver = mockRuntimeDriver;
- this.identifier = identifier;
- this.evaluatorDescriptor = evaluatorDescriptor;
- this.rootContext = new MockActiveContext(
- mockRuntimeDriver,
- this,
- Optional.<MockActiveContext>empty(),
- ROOT_CONTEXT_IDENTIFIER_PREFIX + identifier);
- }
-
- public MockActiveContext getRootContext() {
- return this.rootContext;
- }
-
- @Override
- public void addFile(final File file) {
- // ignore
- }
-
- @Override
- public void addLibrary(final File file) {
- // ignore
- }
-
- @Override
- public EvaluatorDescriptor getEvaluatorDescriptor() {
- return this.evaluatorDescriptor;
- }
-
- @Override
- public void setProcess(final EvaluatorProcess process) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- if (!this.closed) {
- this.mockRuntimeDriver.add(new CloseEvaluator(this));
- } else {
- throw new IllegalStateException("evaluator already closed");
- }
- }
-
- @Override
- public void submitTask(final Configuration taskConfiguration) {
- this.rootContext.submitTask(taskConfiguration);
- }
-
- @Override
- public void submitContext(final Configuration contextConfiguration) {
- this.rootContext.submitContext(contextConfiguration);
- }
-
- @Override
- public void submitContextAndService(
- final Configuration contextConfiguration,
- final Configuration serviceConfiguration) {
- this.rootContext.submitContextAndService(contextConfiguration, serviceConfiguration);
- }
-
- @Override
- public void submitContextAndTask(
- final Configuration contextConfiguration,
- final Configuration taskConfiguration) {
- final String contextID = MockUtils.getValue(contextConfiguration, ContextIdentifier.class);
- final String taskID = MockUtils.getValue(taskConfiguration, TaskConfigurationOptions.Identifier.class);
- final MockActiveContext mockContext = new MockActiveContext(
- this.mockRuntimeDriver,
- this,
- Optional.of(this.rootContext),
- contextID);
- final MockRunningTask mockTask = new MockRunningTask(this.mockRuntimeDriver, taskID, mockContext);
- this.mockRuntimeDriver.add(
- new CreateContextAndTask(
- mockContext,
- mockTask,
- this.mockRuntimeDriver.getTaskReturnValueProvider()));
- }
-
- @Override
- public void submitContextAndServiceAndTask(
- final Configuration contextConfiguration,
- final Configuration serviceConfiguration,
- final Configuration taskConfiguration) {
- submitContextAndTask(contextConfiguration, taskConfiguration);
- }
-
- @Override
- public String getId() {
- return this.identifier;
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockClock.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockClock.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockClock.java
deleted file mode 100644
index cd32b4f..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockClock.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.mock.MockRuntime;
-import org.apache.reef.tang.InjectionFuture;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.Clock;
-import org.apache.reef.wake.time.Time;
-import org.apache.reef.wake.time.event.Alarm;
-import org.apache.reef.wake.time.runtime.event.ClientAlarm;
-
-import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * The MockClock can be used to drive alarms set by the client application.
- */
-@Unstable
-@Private
-public final class MockClock implements Clock {
-
- private final InjectionFuture<MockRuntime> runtime;
-
- private final List<Alarm> alarmList = new ArrayList<>();
-
- private long currentTime = 0;
-
- private boolean closed = false;
-
- @Inject
- MockClock(final InjectionFuture<MockRuntime> runtime) {
- this.runtime = runtime;
- }
-
- /**
- * Advances the clock by the offset amount.
- * @param offset amount to advance clock
- */
- public void advanceClock(final int offset) {
- this.currentTime += offset;
- final Iterator<Alarm> iter = this.alarmList.iterator();
- while (iter.hasNext()) {
- final Alarm alarm = iter.next();
- if (alarm.getTimestamp() <= this.currentTime) {
- alarm.run();
- iter.remove();
- }
- }
- }
-
- /**
- * @return the current mock clock time
- */
- public long getCurrentTime() {
- return this.currentTime;
- }
-
- @Override
- public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) {
- final Alarm alarm = new ClientAlarm(this.currentTime + offset, handler);
- alarmList.add(alarm);
- return alarm;
- }
-
- @Override
- public void close() {
- if (!closed) {
- this.runtime.get().stop();
- this.closed = true;
- }
- }
-
- @Override
- public void stop() {
- close();
- }
-
- @Override
- public void stop(final Throwable exception) {
- close();
- }
-
- @Override
- public boolean isIdle() {
- return this.alarmList.size() == 0;
- }
-
- @Override
- public boolean isClosed() {
- return this.closed;
- }
-
- @Override
- public void run() {
- this.runtime.get().start();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockClosedContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockClosedContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockClosedContext.java
deleted file mode 100644
index 31d5845..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockClosedContext.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ClosedContext;
-import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
-import org.apache.reef.util.Optional;
-
-/**
- * mock closed context.
- */
-@Unstable
-@Private
-public final class MockClosedContext implements ClosedContext {
-
- private final MockActiveContext mockActiveContext;
-
- public MockClosedContext(final MockActiveContext activeContext) {
- this.mockActiveContext = activeContext;
- }
-
- public MockActiveContext getMockActiveContext() {
- return this.mockActiveContext;
- }
-
- @Override
- public ActiveContext getParentContext() {
- return this.mockActiveContext.getParentContext().isPresent() ?
- this.mockActiveContext.getParentContext().get() : null;
- }
-
- @Override
- public String getId() {
- return this.mockActiveContext.getId();
- }
-
- @Override
- public String getEvaluatorId() {
- return this.mockActiveContext.getEvaluatorId();
- }
-
- @Override
- public Optional<String> getParentId() {
- return this.mockActiveContext.getParentId();
- }
-
- @Override
- public EvaluatorDescriptor getEvaluatorDescriptor() {
- return this.mockActiveContext.getEvaluatorDescriptor();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java
deleted file mode 100644
index 6ad6b35..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockCompletedTask.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.task.CompletedTask;
-
-/**
- * mock completed task.
- */
-@Unstable
-@Private
-public final class MockCompletedTask implements CompletedTask {
-
- private final MockRunningTask task;
-
- private final byte[] returnValue;
-
- public MockCompletedTask(final MockRunningTask task, final byte[] returnValue) {
- this.task = task;
- this.returnValue = returnValue;
- }
-
- @Override
- public ActiveContext getActiveContext() {
- return this.task.getActiveContext();
- }
-
- @Override
- public String getId() {
- return this.task.getId();
- }
-
- @Override
- public byte[] get() {
- return this.returnValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java
deleted file mode 100644
index 2d98af0..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorDescriptor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.catalog.NodeDescriptor;
-import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
-import org.apache.reef.driver.evaluator.EvaluatorProcess;
-
-/**
- * mock evaluator descriptor.
- */
-@Unstable
-@Private
-public final class MockEvaluatorDescriptor implements EvaluatorDescriptor {
- private final NodeDescriptor nodeDescriptor;
-
- MockEvaluatorDescriptor(final NodeDescriptor nodeDescriptor) {
- this.nodeDescriptor = nodeDescriptor;
- }
-
- @Override
- public NodeDescriptor getNodeDescriptor() {
- return this.nodeDescriptor;
- }
-
- @Override
- public EvaluatorProcess getProcess() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getMemory() {
- return 0;
- }
-
- @Override
- public int getNumberOfCores() {
- return 1;
- }
-
- @Override
- public String getRuntimeName() {
- return "mock";
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java
deleted file mode 100644
index 7f90039..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockEvaluatorRequestor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.catalog.NodeDescriptor;
-import org.apache.reef.driver.evaluator.EvaluatorRequest;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.mock.request.AllocateEvaluator;
-import org.apache.reef.tang.InjectionFuture;
-
-import javax.inject.Inject;
-import java.util.UUID;
-
-/**
- * mock evaluator requestor.
- */
-@Unstable
-@Private
-public final class MockEvaluatorRequestor implements EvaluatorRequestor {
-
- private final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver;
-
- private final InjectionFuture<MockClock> clock;
-
- @Inject
- MockEvaluatorRequestor(
- final InjectionFuture<MockClock> clock,
- final InjectionFuture<MockRuntimeDriver> mockRuntimeDriver) {
- this.clock = clock;
- this.mockRuntimeDriver = mockRuntimeDriver;
- }
-
- @Override
- public void submit(final EvaluatorRequest req) {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock closed");
- }
- final NodeDescriptor nodeDescriptor = new MockNodeDescriptor();
- final MockEvaluatorDescriptor evaluatorDescriptor = new MockEvaluatorDescriptor(nodeDescriptor);
- for (int i = 0; i < req.getNumber(); i++) {
- final MockAllocatedEvalautor mockEvaluator = new MockAllocatedEvalautor(
- this.mockRuntimeDriver.get(), UUID.randomUUID().toString(), evaluatorDescriptor);
- this.mockRuntimeDriver.get().add(new AllocateEvaluator(mockEvaluator));
- }
- }
-
- @Override
- public Builder newRequest() {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock closed");
- }
- return new Builder();
- }
-
-
- /**
- * {@link EvaluatorRequest.Builder} extended with a new submit method.
- * {@link EvaluatorRequest}s are built using this builder.
- */
- private final class Builder extends EvaluatorRequest.Builder<Builder> {
- @Override
- public void submit() {
- MockEvaluatorRequestor.this.submit(this.build());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java
deleted file mode 100644
index 55eafe1..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedContext.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
-import org.apache.reef.util.Optional;
-
-/**
- * mock failed context.
- */
-@Unstable
-@Private
-public final class MockFailedContext implements FailedContext {
-
- private final MockActiveContext context;
-
- public MockFailedContext(final MockActiveContext context) {
- this.context = context;
- }
-
- @Override
- public Optional<ActiveContext> getParentContext() {
- return this.context.getParentContext().isPresent() ?
- Optional.of((ActiveContext)this.context.getParentContext().get()) :
- Optional.<ActiveContext>empty();
- }
-
- @Override
- public String getMessage() {
- return "mock";
- }
-
- @Override
- public Optional<String> getDescription() {
- return Optional.empty();
- }
-
- @Override
- public Optional<Throwable> getReason() {
- return Optional.empty();
- }
-
- @Override
- public Optional<byte[]> getData() {
- return Optional.empty();
- }
-
- @Override
- public Throwable asError() {
- return new Exception("mock");
- }
-
- @Override
- public String getEvaluatorId() {
- return this.context.getEvaluatorId();
- }
-
- @Override
- public Optional<String> getParentId() {
- return this.context.getParentId();
- }
-
- @Override
- public EvaluatorDescriptor getEvaluatorDescriptor() {
- return this.context.getEvaluatorDescriptor();
- }
-
- @Override
- public String getId() {
- return this.context.getId();
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java
deleted file mode 100644
index d9c0c3c..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockFailedEvaluator.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.driver.task.FailedTask;
-import org.apache.reef.exception.EvaluatorException;
-import org.apache.reef.util.Optional;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * mock failed evaluator.
- */
-@Unstable
-@Private
-public final class MockFailedEvaluator implements FailedEvaluator {
-
- private final String evaluatorID;
-
- private final List<FailedContext> failedContextList;
-
- private final Optional<FailedTask> failedTask;
-
- public MockFailedEvaluator(
- final String evaluatorID,
- final List<FailedContext> failedContextList,
- final Optional<FailedTask> failedTask) {
- this.evaluatorID = evaluatorID;
- this.failedContextList = failedContextList;
- this.failedTask = failedTask;
- }
-
- public MockFailedEvaluator(final String evaluatorID) {
- this.evaluatorID = evaluatorID;
- this.failedContextList = new ArrayList<>();
- this.failedTask = Optional.empty();
- }
-
- @Override
- public EvaluatorException getEvaluatorException() {
- return null;
- }
-
- @Override
- public List<FailedContext> getFailedContextList() {
- return this.failedContextList;
- }
-
- @Override
- public Optional<FailedTask> getFailedTask() {
- return this.failedTask;
- }
-
- @Override
- public String getId() {
- return this.evaluatorID;
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java
deleted file mode 100644
index be04994..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockNodeDescriptor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.catalog.NodeDescriptor;
-import org.apache.reef.driver.catalog.RackDescriptor;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * mock node descriptor.
- */
-@Unstable
-@Private
-public final class MockNodeDescriptor implements NodeDescriptor {
- @Override
- public InetSocketAddress getInetSocketAddress() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public RackDescriptor getRackDescriptor() {
- return new RackDescriptor() {
- @Override
- public List<NodeDescriptor> getNodes() {
- final List<NodeDescriptor> nodes = new ArrayList<>();
- nodes.add(MockNodeDescriptor.this);
- return nodes;
- }
-
- @Override
- public String getName() {
- return "mock";
- }
- };
- }
-
- @Override
- public String getName() {
- return "mock";
- }
-
- @Override
- public String getId() {
- return "mock";
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java
deleted file mode 100644
index bec26f4..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRunningTask.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.task.RunningTask;
-import org.apache.reef.mock.request.CloseTask;
-import org.apache.reef.mock.request.SendMessageDriverToTask;
-import org.apache.reef.mock.request.SuspendTask;
-import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
-import org.apache.reef.util.Optional;
-
-/**
- * mock running task.
- */
-@Unstable
-@Private
-public final class MockRunningTask implements RunningTask {
-
- private final MockRuntimeDriver mockRuntimeDriver;
-
- private final String taskID;
-
- private final ActiveContext context;
-
- MockRunningTask(
- final MockRuntimeDriver mockRuntimeDriver,
- final String taskID,
- final ActiveContext context) {
- this.mockRuntimeDriver = mockRuntimeDriver;
- this.taskID = taskID;
- this.context = context;
- }
-
- public String evaluatorID() {
- return this.context.getEvaluatorId();
- }
-
- @Override
- public ActiveContext getActiveContext() {
- return this.context;
- }
-
- @Override
- public void send(final byte[] message) {
- this.mockRuntimeDriver.add(new SendMessageDriverToTask(this, message));
- }
-
- @Override
- public void suspend(final byte[] message) {
- this.mockRuntimeDriver.add(new SuspendTask(this, Optional.of(message)));
- }
-
- @Override
- public void suspend() {
- this.mockRuntimeDriver.add(new SuspendTask(this, Optional.<byte[]>empty()));
- }
-
- @Override
- public void close(final byte[] message) {
- this.mockRuntimeDriver.add(new CloseTask(this, this.mockRuntimeDriver.getTaskReturnValueProvider()));
- }
-
- @Override
- public void close() {
- this.mockRuntimeDriver.add(new CloseTask(this, this.mockRuntimeDriver.getTaskReturnValueProvider()));
- }
-
- @Override
- public TaskRepresenter getTaskRepresenter() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getId() {
- return this.taskID;
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java
deleted file mode 100644
index 0b89e61..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockRuntimeDriver.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.context.ContextMessage;
-import org.apache.reef.driver.context.FailedContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.CompletedEvaluator;
-import org.apache.reef.driver.evaluator.FailedEvaluator;
-import org.apache.reef.driver.parameters.*;
-import org.apache.reef.driver.task.*;
-import org.apache.reef.io.Tuple;
-import org.apache.reef.mock.MockRuntime;
-import org.apache.reef.mock.MockTaskReturnValueProvider;
-import org.apache.reef.mock.ProcessRequest;
-import org.apache.reef.mock.request.*;
-import org.apache.reef.tang.InjectionFuture;
-import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.util.Optional;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.time.Clock;
-import org.apache.reef.wake.time.event.StartTime;
-import org.apache.reef.wake.time.event.StopTime;
-
-import javax.inject.Inject;
-import java.util.*;
-
-/**
- * mock runtime driver.
- */
-@Unstable
-@Private
-public final class MockRuntimeDriver implements MockRuntime {
-
- private final InjectionFuture<MockClock> clock;
-
- private final List<ProcessRequest> processRequestQueue = new ArrayList<>();
-
- private final Set<EventHandler<StartTime>> driverStartHandlers;
-
- private final Set<EventHandler<StopTime>> driverStopHandlers;
-
- private final Set<EventHandler<AllocatedEvaluator>> allocatedEvaluatorHandlers;
-
- private final Set<EventHandler<CompletedEvaluator>> completedEvaluatorHandlers;
-
- private final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers;
-
- private final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers;
-
- private final Set<EventHandler<FailedTask>> taskFailedHandlers;
-
- private final Set<EventHandler<TaskMessage>> taskMessageHandlers;
-
- private final Set<EventHandler<CompletedTask>> taskCompletedHandlers;
-
- private final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers;
-
- private final Set<EventHandler<ActiveContext>> contextActiveHandlers;
-
- private final Set<EventHandler<CloseContext>> contextClosedHandlers;
-
- private final Set<EventHandler<ContextMessage>> contextMessageHandlers;
-
- private final Set<EventHandler<FailedContext>> contextFailedHandlers;
-
- private final Map<String, MockAllocatedEvalautor> allocatedEvaluatorMap = new HashMap<>();
-
- private final Map<String, List<MockActiveContext>> allocatedContextsMap = new HashMap<>();
-
- private final Map<String, MockRunningTask> runningTasks = new HashMap<>();
-
- private final MockTaskReturnValueProvider taskReturnValueProvider;
-
- @Inject
- MockRuntimeDriver(
- final InjectionFuture<MockClock> clock,
- final MockTaskReturnValueProvider taskReturnValueProvider,
- @Parameter(DriverStartHandler.class) final Set<EventHandler<StartTime>> driverStartHandlers,
- @Parameter(Clock.StopHandler.class) final Set<EventHandler<StopTime>> driverStopHandlers,
- @Parameter(EvaluatorAllocatedHandlers.class) final Set<EventHandler<AllocatedEvaluator>>
- allocatedEvaluatorHandlers,
- @Parameter(EvaluatorCompletedHandlers.class) final Set<EventHandler<CompletedEvaluator>>
- completedEvaluatorHandlers,
- @Parameter(EvaluatorFailedHandlers.class) final Set<EventHandler<FailedEvaluator>> failedEvaluatorHandlers,
- @Parameter(TaskRunningHandlers.class) final Set<EventHandler<TaskRunningHandlers>> taskRunningHandlers,
- @Parameter(TaskFailedHandlers.class) final Set<EventHandler<FailedTask>> taskFailedHandlers,
- @Parameter(TaskMessageHandlers.class) final Set<EventHandler<TaskMessage>> taskMessageHandlers,
- @Parameter(TaskCompletedHandlers.class) final Set<EventHandler<CompletedTask>> taskCompletedHandlers,
- @Parameter(TaskSuspendedHandlers.class) final Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
- @Parameter(ContextActiveHandlers.class) final Set<EventHandler<ActiveContext>> contextActiveHandlers,
- @Parameter(ContextClosedHandlers.class) final Set<EventHandler<CloseContext>> contextClosedHandlers,
- @Parameter(ContextMessageHandlers.class) final Set<EventHandler<ContextMessage>> contextMessageHandlers,
- @Parameter(ContextFailedHandlers.class) final Set<EventHandler<FailedContext>> contextFailedHandlers) {
- this.clock = clock;
- this.taskReturnValueProvider = taskReturnValueProvider;
- this.driverStartHandlers = driverStartHandlers;
- this.driverStopHandlers = driverStopHandlers;
- this.allocatedEvaluatorHandlers = allocatedEvaluatorHandlers;
- this.completedEvaluatorHandlers = completedEvaluatorHandlers;
- this.failedEvaluatorHandlers = failedEvaluatorHandlers;
- this.taskRunningHandlers = taskRunningHandlers;
- this.taskFailedHandlers = taskFailedHandlers;
- this.taskMessageHandlers = taskMessageHandlers;
- this.taskCompletedHandlers = taskCompletedHandlers;
- this.taskSuspendedHandlers = taskSuspendedHandlers;
- this.contextActiveHandlers = contextActiveHandlers;
- this.contextClosedHandlers = contextClosedHandlers;
- this.contextMessageHandlers = contextMessageHandlers;
- this.contextFailedHandlers = contextFailedHandlers;
- }
-
- @Override
- public Collection<AllocatedEvaluator> getCurrentAllocatedEvaluators() {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock is closed");
- }
- return new ArrayList<AllocatedEvaluator>(this.allocatedEvaluatorMap.values());
- }
-
- @Override
- public void fail(final AllocatedEvaluator evaluator) {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock is closed");
- }
- if (this.allocatedEvaluatorMap.containsKey(evaluator.getId())) {
- FailedTask failedTask = null;
- if (this.runningTasks.containsKey(evaluator.getId())) {
- final RunningTask task = this.runningTasks.remove(evaluator.getId());
- failedTask = new FailedTask(
- task.getId(),
- "mock",
- Optional.<String>empty(),
- Optional.<Throwable>empty(),
- Optional.<byte[]>empty(),
- Optional.<ActiveContext>of(task.getActiveContext()));
- }
- final List<FailedContext> failedContexts = new ArrayList<>();
- for (final MockActiveContext context : this.allocatedContextsMap.get(evaluator.getId())) {
- failedContexts.add(new MockFailedContext(context));
- }
- this.allocatedContextsMap.remove(evaluator.getId());
-
- post(this.failedEvaluatorHandlers, new MockFailedEvaluator(
- evaluator.getId(), failedContexts,
- failedTask == null ? Optional.<FailedTask>empty() : Optional.of(failedTask)));
- } else {
- throw new IllegalStateException("unknown evaluator " + evaluator);
- }
- }
-
- @Override
- public Collection<ActiveContext> getCurrentActiveContexts() {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock is closed");
- }
- final List<ActiveContext> currentActiveContexts = new ArrayList<>();
- for (final List<MockActiveContext> contexts : this.allocatedContextsMap.values()) {
- currentActiveContexts.addAll(contexts);
- }
- return currentActiveContexts;
- }
-
- @Override
- public void fail(final ActiveContext context) {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock is closed");
- }
- final MockAllocatedEvalautor evaluator = ((MockActiveContext) context).getEvaluator();
- post(this.contextFailedHandlers, new MockFailedContext((MockActiveContext) context));
- if (!((MockActiveContext) context).getParentContext().isPresent()) {
- // root context failure shuts evalautor down
- fail(evaluator);
- } else {
- this.allocatedContextsMap.get(evaluator.getId()).remove(context);
- }
- }
-
- @Override
- public Collection<RunningTask> getCurrentRunningTasks() {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock is closed");
- }
- return new ArrayList<RunningTask>(this.runningTasks.values());
- }
-
- @Override
- public void fail(final RunningTask task) {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock is closed");
- }
- final String evaluatorID = task.getActiveContext().getEvaluatorId();
- if (this.runningTasks.containsKey(evaluatorID) &&
- this.runningTasks.get(evaluatorID).equals(task)) {
- this.runningTasks.remove(evaluatorID);
- post(taskFailedHandlers, new FailedTask(
- task.getId(),
- "mock",
- Optional.<String>empty(),
- Optional.<Throwable>empty(),
- Optional.<byte[]>empty(),
- Optional.of(task.getActiveContext())));
- } else {
- throw new IllegalStateException("unknown running task " + task);
- }
- }
-
- @Override
- public void start() {
- post(this.driverStartHandlers, new StartTime(this.clock.get().getCurrentTime()));
- }
-
- @Override
- public void stop() {
- post(this.driverStopHandlers, new StopTime(this.clock.get().getCurrentTime()));
- }
-
- @Override
- public boolean hasProcessRequest() {
- return this.processRequestQueue.size() > 0;
- }
-
- @Override
- public ProcessRequest getNextProcessRequest() {
- if (this.processRequestQueue.size() > 0) {
- return this.processRequestQueue.remove(0);
- } else {
- return null;
- }
- }
-
- @Override
- public void succeed(final ProcessRequest pr) {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock is closed");
- }
- final ProcessRequestInternal request = (ProcessRequestInternal) pr;
- switch (request.getType()) {
- case ALLOCATE_EVALUATOR:
- final MockAllocatedEvalautor allocatedEvalautor = ((AllocateEvaluator)request).getSuccessEvent();
- validateAndCreate(allocatedEvalautor);
- post(this.allocatedEvaluatorHandlers, allocatedEvalautor);
- post(this.contextActiveHandlers, allocatedEvalautor.getRootContext());
- break;
- case CLOSE_EVALUATOR:
- final CompletedEvaluator closedEvaluator = ((CloseEvaluator)request).getSuccessEvent();
- validateAndClose(closedEvaluator);
- post(this.completedEvaluatorHandlers, closedEvaluator);
- break;
- case CREATE_CONTEXT:
- final MockActiveContext createContext = ((CreateContext) request).getSuccessEvent();
- validateAndCreate(createContext);
- post(this.contextActiveHandlers, createContext);
- break;
- case CLOSE_CONTEXT:
- final MockClosedContext closeContext = ((CloseContext) request).getSuccessEvent();
- validateAndClose(closeContext);
- post(this.contextClosedHandlers, closeContext);
- break;
- case CREATE_TASK:
- final MockRunningTask createTask = ((CreateTask)request).getSuccessEvent();
- validateAndCreate(createTask);
- post(this.taskRunningHandlers, request.getSuccessEvent());
- break;
- case SUSPEND_TASK:
- final MockRunningTask suspendedTask = ((SuspendTask)request).getTask();
- validateAndClose(suspendedTask);
- post(this.taskSuspendedHandlers, request.getSuccessEvent());
- break;
- case CLOSE_TASK:
- case COMPLETE_TASK:
- final MockRunningTask completedTask = ((CompleteTask)request).getTask();
- validateAndClose(completedTask);
- post(this.taskCompletedHandlers, request.getSuccessEvent());
- break;
- case CREATE_CONTEXT_AND_TASK:
- final CreateContextAndTask createContextTask = (CreateContextAndTask) request;
- final Tuple<MockActiveContext, MockRunningTask> events = createContextTask.getSuccessEvent();
- validateAndCreate(events.getKey());
- post(this.contextActiveHandlers, events.getKey());
- validateAndCreate(events.getValue());
- post(this.taskRunningHandlers, events.getValue());
- break;
- case SEND_MESSAGE_DRIVER_TO_TASK:
- // ignore
- break;
- case SEND_MESSAGE_DRIVER_TO_CONTEXT:
- // ignore
- break;
- default:
- throw new IllegalStateException("unknown type");
- }
-
- if (request.doAutoComplete()) {
- add(request.getCompletionProcessRequest());
- } else if (!this.clock.get().isClosed() && isIdle()) {
- this.clock.get().close();
- }
- }
-
- @Override
- public void fail(final ProcessRequest pr) {
- if (this.clock.get().isClosed()) {
- throw new IllegalStateException("clock is closed");
- }
- final ProcessRequestInternal request = (ProcessRequestInternal) pr;
- switch (request.getType()) {
- case ALLOCATE_EVALUATOR:
- post(this.failedEvaluatorHandlers, request.getFailureEvent());
- break;
- case CLOSE_EVALUATOR:
- final CompletedEvaluator evaluator = ((CloseEvaluator)request).getSuccessEvent();
- validateAndClose(evaluator);
- post(this.failedEvaluatorHandlers, request.getFailureEvent());
- break;
- case CREATE_CONTEXT:
- post(this.contextFailedHandlers, request.getFailureEvent());
- break;
- case CLOSE_CONTEXT:
- final MockClosedContext context = ((CloseContext)request).getSuccessEvent();
- validateAndClose(context);
- if (context.getParentContext() == null) {
- add(new CloseEvaluator(context.getMockActiveContext().getEvaluator()));
- }
- post(this.contextFailedHandlers, request.getFailureEvent());
- break;
- case CREATE_TASK:
- post(this.taskFailedHandlers, request.getFailureEvent());
- break;
- case SUSPEND_TASK:
- validateAndClose(((SuspendTask)request).getTask());
- post(this.taskFailedHandlers, request.getFailureEvent());
- break;
- case CLOSE_TASK:
- case COMPLETE_TASK:
- validateAndClose(((CloseTask)request).getTask());
- post(this.taskFailedHandlers, request.getFailureEvent());
- break;
- case CREATE_CONTEXT_AND_TASK:
- final CreateContextAndTask createContextTask = (CreateContextAndTask) request;
- final Tuple<MockFailedContext, FailedTask> events = createContextTask.getFailureEvent();
- post(this.taskFailedHandlers, events.getValue());
- post(this.contextFailedHandlers, events.getKey());
- break;
- case SEND_MESSAGE_DRIVER_TO_TASK:
- // ignore
- break;
- case SEND_MESSAGE_DRIVER_TO_CONTEXT:
- // ignore
- break;
- default:
- throw new IllegalStateException("unknown type");
- }
-
- if (!this.clock.get().isClosed() && isIdle()) {
- this.clock.get().close();
- }
- }
-
- MockTaskReturnValueProvider getTaskReturnValueProvider() {
- return this.taskReturnValueProvider;
- }
- /**
- * Used by mock REEF entities (e.g., AllocatedEvaluator, RunningTask) to inject
- * process requests from initiated actions e.g., RunningTask.close().
- * @param request to inject
- */
- void add(final ProcessRequest request) {
- this.processRequestQueue.add(request);
- }
-
- private boolean isIdle() {
- return this.clock.get().isIdle() &&
- this.processRequestQueue.isEmpty() &&
- this.allocatedEvaluatorMap.isEmpty();
- }
-
- private <T> void post(final Set<EventHandler<T>> handlers, final Object event) {
- for (final EventHandler<T> handler : handlers) {
- handler.onNext((T) event);
- }
- }
-
- private void validateAndCreate(final MockActiveContext context) {
- if (!this.allocatedEvaluatorMap.containsKey(context.getEvaluatorId())) {
- throw new IllegalStateException("unknown evaluator id " + context.getEvaluatorId());
- } else if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) {
- this.allocatedContextsMap.put(context.getEvaluatorId(), new ArrayList<MockActiveContext>());
- }
- this.allocatedContextsMap.get(context.getEvaluatorId()).add(context);
- }
-
- private void validateAndClose(final MockClosedContext context) {
- if (!this.allocatedContextsMap.containsKey(context.getEvaluatorId())) {
- throw new IllegalStateException("unknown evaluator id " + context.getEvaluatorId());
- }
- final List<MockActiveContext> contexts = this.allocatedContextsMap.get(context.getEvaluatorId());
- if (!contexts.get(contexts.size() - 1).equals(context.getMockActiveContext())) {
- throw new IllegalStateException("closing context that is not on the top of the stack");
- }
- contexts.remove(context.getMockActiveContext());
- }
-
- private void validateAndCreate(final MockRunningTask task) {
- if (this.runningTasks.containsKey(task.evaluatorID())) {
- throw new IllegalStateException("task already running on evaluator " +
- task.evaluatorID());
- }
- this.runningTasks.put(task.evaluatorID(), task);
- }
-
- private void validateAndClose(final MockRunningTask task) {
- if (!this.runningTasks.containsKey(task.getActiveContext().getEvaluatorId())) {
- throw new IllegalStateException("no task running on evaluator");
- }
- this.runningTasks.remove(task.getActiveContext().getEvaluatorId());
- }
-
- private void validateAndCreate(final MockAllocatedEvalautor evalutor) {
- if (this.allocatedEvaluatorMap.containsKey(evalutor.getId())) {
- throw new IllegalStateException("evaluator id " + evalutor.getId() + " already exists");
- }
- this.allocatedEvaluatorMap.put(evalutor.getId(), evalutor);
- this.allocatedContextsMap.put(evalutor.getId(), new ArrayList<MockActiveContext>());
- this.allocatedContextsMap.get(evalutor.getId()).add(evalutor.getRootContext());
- }
-
- private void validateAndClose(final CompletedEvaluator evalautor) {
- if (!this.allocatedEvaluatorMap.containsKey(evalautor.getId())) {
- throw new IllegalStateException("unknown evaluator id " + evalautor.getId());
- }
- this.allocatedEvaluatorMap.remove(evalautor.getId());
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/5ed56eba/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java b/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java
deleted file mode 100644
index 84569ff..0000000
--- a/lang/java/reef-runtime-mock/src/main/java/org/apache/reef/mock/runtime/MockSuspendedTask.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.reef.mock.runtime;
-
-import org.apache.reef.annotations.Unstable;
-import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.task.SuspendedTask;
-
-/**
- * mock suspended task.
- */
-@Unstable
-@Private
-public final class MockSuspendedTask implements SuspendedTask {
-
- private final MockRunningTask task;
-
- public MockSuspendedTask(final MockRunningTask task) {
- this.task = task;
- }
-
- @Override
- public ActiveContext getActiveContext() {
- return this.task.getActiveContext();
- }
-
- @Override
- public byte[] get() {
- return new byte[0];
- }
-
- @Override
- public String getId() {
- return this.task.getId();
- }
-}