You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:02 UTC

[29/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
new file mode 100644
index 0000000..c42cba5
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerREEFYarn.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import java.io.IOException;
+
+import static org.apache.reef.examples.scheduler.SchedulerREEF.runTaskScheduler;
+
+/**
+ * REEF TaskScheduler on YARN runtime.
+ */
+public final class SchedulerREEFYarn {
+  /**
+   * Launch the scheduler with YARN client configuration
+   * @param args
+   * @throws InjectionException
+   * @throws java.io.IOException
+   */
+  public final static void main(String[] args)
+    throws InjectionException, IOException, ParseException {
+    final Configuration runtimeConfiguration =
+      YarnClientConfiguration.CONF.build();
+    runTaskScheduler(runtimeConfiguration, args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
new file mode 100644
index 0000000..50293e6
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/SchedulerResponse.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+/**
+ * This class specifies the response from the Scheduler.
+ * It includes the status code and message.
+ */
+final class SchedulerResponse {
+  /**
+   * 200 OK : The request succeeded normally.
+   */
+  private static final int SC_OK = 200;
+
+  /**
+   * 400 BAD REQUEST : The request is syntactically incorrect.
+   */
+  private static final int SC_BAD_REQUEST = 400;
+
+  /**
+   * 403 FORBIDDEN : Syntactically okay but refused to process.
+   */
+  private static final int SC_FORBIDDEN = 403;
+
+  /**
+   * 404 NOT FOUND :  The resource is not available.
+   */
+  private static final int SC_NOT_FOUND = 404;
+
+  /**
+   * Create a response with OK status
+   */
+  public static SchedulerResponse OK(final String message){
+    return new SchedulerResponse (SC_OK, message);
+  }
+
+  /**
+   * Create a response with BAD_REQUEST status
+   */
+  public static SchedulerResponse BAD_REQUEST(final String message){
+    return new SchedulerResponse (SC_BAD_REQUEST, message);
+  }
+
+  /**
+   * Create a response with FORBIDDEN status
+   */
+  public static SchedulerResponse FORBIDDEN(final String message){
+    return new SchedulerResponse (SC_FORBIDDEN, message);
+  }
+
+  /**
+   * Create a response with NOT FOUND status
+   */
+  public static SchedulerResponse NOT_FOUND(final String message){
+    return new SchedulerResponse (SC_NOT_FOUND, message);
+  }
+
+  /**
+   * Return {@code true} if the response is OK.
+   */
+  public boolean isOK(){
+    return this.status == SC_OK;
+  }
+
+  /**
+   * Status code of the request based on RFC 2068.
+   */
+  private int status;
+
+  /**
+   * Message to send.
+   */
+  private String message;
+
+  /**
+   * Constructor using status code and message.
+   * @param status
+   * @param message
+   */
+  private SchedulerResponse(final int status, final String message) {
+    this.status = status;
+    this.message = message;
+  }
+
+  /**
+   * Return the status code of this response.
+   */
+  int getStatus() {
+    return status;
+  }
+
+  /**
+   * Return the message of this response.
+   */
+  String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
new file mode 100644
index 0000000..fe777ff
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/TaskEntity.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.scheduler;
+
+/**
+ * TaskEntity represent a single entry of task queue used in
+ * scheduler. Since REEF already has the class named {Task},
+ * a different name is used for this class.
+ */
+final class TaskEntity {
+  private final int taskId;
+  private final String command;
+
+  public TaskEntity(final int taskId, final String command) {
+    this.taskId = taskId;
+    this.command = command;
+  }
+
+  /**
+   * Return the TaskID assigned to this Task.
+   */
+  int getId() {
+    return taskId;
+  }
+
+  String getCommand() {
+    return command;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TaskEntity that = (TaskEntity) o;
+
+    if (taskId != that.taskId) return false;
+    if (!command.equals(that.command)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = taskId;
+    result = 31 * result + command.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append("<Id=").append(taskId).
+      append(", Command=").append(command).append(">").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
new file mode 100644
index 0000000..728806c
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/scheduler/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task scheduler example based on reef-webserver
+ */
+package org.apache.reef.examples.scheduler;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
new file mode 100644
index 0000000..db312ce
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Control.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.LoggingEventHandler;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class Control {
+
+  private static final Logger LOG = Logger.getLogger(Control.class.getName());
+  private final transient String command;
+  private final transient String taskId;
+  private final transient int port;
+
+  @Inject
+  public Control(@Parameter(SuspendClientControl.Port.class) final int port,
+                 @Parameter(TaskId.class) final String taskId,
+                 @Parameter(Command.class) final String command) {
+    this.command = command.trim().toLowerCase();
+    this.taskId = taskId;
+    this.port = port;
+  }
+
+  private static Configuration getConfig(final String[] args) throws IOException, BindException {
+    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+    new CommandLine(cb).processCommandLine(args, SuspendClientControl.Port.class, TaskId.class, Command.class);
+    return cb.build();
+  }
+
+  public static void main(final String[] args) throws Exception {
+    final Configuration config = getConfig(args);
+    final Injector injector = Tang.Factory.getTang().newInjector(config);
+    final Control control = injector.getInstance(Control.class);
+    control.run();
+  }
+
+  public void run() throws Exception {
+
+    LOG.log(Level.INFO, "command: {0} task: {1} port: {2}",
+        new Object[]{this.command, this.taskId, this.port});
+
+    final ObjectSerializableCodec<String> codec = new ObjectSerializableCodec<>();
+
+    final EStage<TransportEvent> stage = new ThreadPoolStage<>("suspend-control-client",
+        new LoggingEventHandler<TransportEvent>(), 1, new EventHandler<Throwable>() {
+      @Override
+      public void onNext(final Throwable throwable) {
+        throw new RuntimeException(throwable);
+      }
+    });
+
+    try (final Transport transport = new NettyMessagingTransport("localhost", 0, stage, stage, 1, 10000)) {
+      final Link link = transport.open(new InetSocketAddress("localhost", this.port), codec, null);
+      link.write(this.command + " " + this.taskId);
+    }
+  }
+
+  @NamedParameter(doc = "Task id", short_name = "task")
+  public static final class TaskId implements Name<String> {
+  }
+
+  @NamedParameter(doc = "Command: 'suspend' or 'resume'", short_name = "cmd")
+  public static final class Command implements Name<String> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Launch.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Launch.java
new file mode 100644
index 0000000..696d02d
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/Launch.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.client.ClientConfiguration;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.CommandLine;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Suspend/Resume example - main class.
+ */
+public final class Launch {
+
+  /**
+   * Standard Java logger
+   */
+  private static final Logger LOG = Logger.getLogger(Launch.class.getName());
+  /**
+   * Number of REEF worker threads in local mode.
+   */
+  private static final int NUM_LOCAL_THREADS = 4;
+
+  /**
+   * This class should not be instantiated.
+   */
+  private Launch() {
+    throw new RuntimeException("Do not instantiate this class!");
+  }
+
+  /**
+   * @param args command line arguments, as passed to main()
+   * @return Configuration object.
+   */
+  private static Configuration parseCommandLine(final String[] args)
+      throws IOException, BindException {
+    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+    final CommandLine cl = new CommandLine(confBuilder);
+    cl.registerShortNameOfClass(Local.class);
+    cl.registerShortNameOfClass(NumCycles.class);
+    cl.registerShortNameOfClass(Delay.class);
+    cl.registerShortNameOfClass(SuspendClientControl.Port.class);
+    cl.processCommandLine(args);
+    return confBuilder.build();
+  }
+
+  private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
+      throws InjectionException, BindException {
+    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+    cb.bindNamedParameter(NumCycles.class, String.valueOf(injector.getNamedInstance(NumCycles.class)));
+    cb.bindNamedParameter(Delay.class, String.valueOf(injector.getNamedInstance(Delay.class)));
+    cb.bindNamedParameter(SuspendClientControl.Port.class,
+        String.valueOf(injector.getNamedInstance(SuspendClientControl.Port.class)));
+    return cb.build();
+  }
+
+  /**
+   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+   *
+   * @param args Command line arguments, as passed into main().
+   * @return (immutable) TANG Configuration object.
+   * @throws BindException      if configuration commandLineInjector fails.
+   * @throws InjectionException if configuration commandLineInjector fails.
+   * @throws IOException        error reading the configuration.
+   */
+  private static Configuration getClientConfiguration(final String[] args)
+      throws BindException, InjectionException, IOException {
+    final Configuration commandLineConf = parseCommandLine(args);
+
+    final Configuration clientConfiguration = ClientConfiguration.CONF
+        .set(ClientConfiguration.ON_JOB_RUNNING, SuspendClient.RunningJobHandler.class)
+        .set(ClientConfiguration.ON_JOB_FAILED, SuspendClient.FailedJobHandler.class)
+        .set(ClientConfiguration.ON_JOB_COMPLETED, SuspendClient.CompletedJobHandler.class)
+        .set(ClientConfiguration.ON_RUNTIME_ERROR, SuspendClient.RuntimeErrorHandler.class)
+        .build();
+
+    // TODO: Remove the injector, have stuff injected.
+    final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
+    final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
+    final Configuration runtimeConfiguration;
+    if (isLocal) {
+      LOG.log(Level.INFO, "Running on the local runtime");
+      runtimeConfiguration = LocalRuntimeConfiguration.CONF
+          .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+          .build();
+    } else {
+      LOG.log(Level.INFO, "Running on YARN");
+      runtimeConfiguration = YarnClientConfiguration.CONF.build();
+    }
+
+    return Tang.Factory.getTang()
+        .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
+            cloneCommandLineConfiguration(commandLineConf))
+        .build();
+  }
+
+  /**
+   * Main method that runs the example.
+   *
+   * @param args command line parameters.
+   */
+  public static void main(final String[] args) {
+    try {
+      final Configuration config = getClientConfiguration(args);
+
+      LOG.log(Level.INFO, "Configuration:\n--\n{0}--",
+          new AvroConfigurationSerializer().toString(config));
+
+      final Injector injector = Tang.Factory.getTang().newInjector(config);
+      final SuspendClient client = injector.getInstance(SuspendClient.class);
+
+      client.submit();
+      client.waitForCompletion();
+      LOG.info("Done!");
+
+    } catch (final BindException | IOException | InjectionException ex) {
+      LOG.log(Level.SEVERE, "Cannot launch: configuration error", ex);
+    } catch (final Exception ex) {
+      LOG.log(Level.SEVERE, "Cleanup error", ex);
+    }
+  }
+
+  /**
+   * Command line parameter = true to run locally, or false to run on YARN.
+   */
+  @NamedParameter(doc = "Whether or not to run on the local runtime",
+      short_name = "local", default_value = "true")
+  public static final class Local implements Name<Boolean> {
+  }
+
+  /**
+   * Command line parameter: number of iterations to run.
+   */
+  @NamedParameter(doc = "Number of iterations to run", short_name = "cycles", default_value = "20")
+  public static final class NumCycles implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter: delay in seconds for each cycle.
+   */
+  @NamedParameter(doc = "Delay in seconds between the cycles", short_name = "delay", default_value = "1")
+  public static final class Delay implements Name<Integer> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/ObjectWritableCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/ObjectWritableCodec.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/ObjectWritableCodec.java
new file mode 100644
index 0000000..72b257a
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/ObjectWritableCodec.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
+
+import java.io.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Codec for Hadoop Writable object serialization.
+ *
+ * @param <T> Class derived from Hadoop Writable.
+ */
+public class ObjectWritableCodec<T extends Writable> implements Codec<T> {
+
+  /**
+   * Standard Java logger
+   */
+  private static final Logger LOG = Logger.getLogger(ObjectWritableCodec.class.getName());
+
+  /**
+   * we need it to invoke the class constructor.
+   */
+  private final Class<? extends T> writableClass;
+
+  /**
+   * Create a new codec for Hadoop Writables.
+   *
+   * @param clazz we need it to invoke the class constructor.
+   */
+  public ObjectWritableCodec(final Class<? extends T> clazz) {
+    this.writableClass = clazz;
+  }
+
+  /**
+   * Encodes Hadoop Writable object into a byte array.
+   *
+   * @param writable the object to encode.
+   * @return serialized object as byte array.
+   * @throws RemoteRuntimeException if serialization fails.
+   */
+  @Override
+  public byte[] encode(T writable) {
+    try (final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+         final DataOutputStream dos = new DataOutputStream(bos)) {
+      writable.write(dos);
+      return bos.toByteArray();
+    } catch (final IOException ex) {
+      LOG.log(Level.SEVERE, "Cannot encode object " + writable, ex);
+      throw new RemoteRuntimeException(ex);
+    }
+  }
+
+  /**
+   * Decode Hadoop Writable object from a byte array.
+   *
+   * @param buffer serialized version of the Writable object (as a byte array).
+   * @return a Writable object.
+   * @throws RemoteRuntimeException if deserialization fails.
+   */
+  @Override
+  public T decode(byte[] buffer) {
+    try (final ByteArrayInputStream bis = new ByteArrayInputStream(buffer);
+         final DataInputStream dis = new DataInputStream(bis)) {
+      final T writable = this.writableClass.newInstance();
+      writable.readFields(dis);
+      return writable;
+    } catch (final IOException | InstantiationException | IllegalAccessException ex) {
+      LOG.log(Level.SEVERE, "Cannot decode class " + this.writableClass, ex);
+      throw new RemoteRuntimeException(ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
new file mode 100644
index 0000000..e57e9b5
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClient.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.client.*;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public class SuspendClient {
+
+  /**
+   * Standard java logger.
+   */
+  private final static Logger LOG = Logger.getLogger(SuspendClient.class.getName());
+
+  /**
+   * Job Driver configuration.
+   */
+  private final Configuration driverConfig;
+
+  /**
+   * Reference to the REEF framework.
+   */
+  private final REEF reef;
+
+  /**
+   * Controller that listens for suspend/resume commands on a specified port.
+   */
+  private final SuspendClientControl controlListener;
+
+  /**
+   * @param reef      reference to the REEF framework.
+   * @param port      port to listen to for suspend/resume commands.
+   * @param numCycles number of cycles to run in the task.
+   * @param delay     delay in seconds between cycles in the task.
+   */
+  @Inject
+  SuspendClient(
+      final REEF reef,
+      final @Parameter(SuspendClientControl.Port.class) int port,
+      final @Parameter(Launch.NumCycles.class) int numCycles,
+      final @Parameter(Launch.Delay.class) int delay) throws BindException, IOException {
+
+    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles))
+        .bindNamedParameter(Launch.Delay.class, Integer.toString(delay));
+
+    cb.addConfiguration(DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(SuspendDriver.class))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "suspend-" + System.currentTimeMillis())
+        .set(DriverConfiguration.ON_TASK_RUNNING, SuspendDriver.RunningTaskHandler.class)
+        .set(DriverConfiguration.ON_TASK_COMPLETED, SuspendDriver.CompletedTaskHandler.class)
+        .set(DriverConfiguration.ON_TASK_SUSPENDED, SuspendDriver.SuspendedTaskHandler.class)
+        .set(DriverConfiguration.ON_TASK_MESSAGE, SuspendDriver.TaskMessageHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, SuspendDriver.AllocatedEvaluatorHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, SuspendDriver.ActiveContextHandler.class)
+        .set(DriverConfiguration.ON_CLIENT_MESSAGE, SuspendDriver.ClientMessageHandler.class)
+        .set(DriverConfiguration.ON_DRIVER_STARTED, SuspendDriver.StartHandler.class)
+        .set(DriverConfiguration.ON_DRIVER_STOP, SuspendDriver.StopHandler.class)
+        .build());
+
+    this.driverConfig = cb.build();
+    this.reef = reef;
+    this.controlListener = new SuspendClientControl(port);
+  }
+
+  /**
+   * Start the job driver.
+   */
+  public void submit() {
+    LOG.info("Start the job driver");
+    this.reef.submit(this.driverConfig);
+  }
+
+  /**
+   * Wait for the job to complete.
+   */
+  public void waitForCompletion() throws Exception {
+    LOG.info("Waiting for the Job Driver to complete.");
+    try {
+      synchronized (this) {
+        this.wait();
+      }
+    } catch (final InterruptedException ex) {
+      LOG.log(Level.WARNING, "Waiting for result interrupted.", ex);
+    }
+    this.reef.close();
+    this.controlListener.close();
+  }
+
+  /**
+   * Receive notification from the driver that the job is about to run.
+   * RunningJob object is a proxy to the running job driver that can be used for sending messages.
+   */
+  final class RunningJobHandler implements EventHandler<RunningJob> {
+    @Override
+    public void onNext(final RunningJob job) {
+      LOG.log(Level.INFO, "Running job: {0}", job.getId());
+      SuspendClient.this.controlListener.setRunningJob(job);
+    }
+  }
+
+  /**
+   * Receive notification from the driver that the job had failed.
+   * <p/>
+   * FailedJob is a proxy for the failed job driver
+   * (contains job ID and exception thrown from the driver).
+   */
+  final class FailedJobHandler implements EventHandler<FailedJob> {
+    @Override
+    public void onNext(final FailedJob job) {
+      LOG.log(Level.SEVERE, "Failed job: " + job.getId(), job.getReason().orElse(null));
+      synchronized (SuspendClient.this) {
+        SuspendClient.this.notify();
+      }
+    }
+  }
+
+  /**
+   * Receive notification from the driver that the job had completed successfully.
+   */
+  final class CompletedJobHandler implements EventHandler<CompletedJob> {
+    @Override
+    public void onNext(final CompletedJob job) {
+      LOG.log(Level.INFO, "Completed job: {0}", job.getId());
+      synchronized (SuspendClient.this) {
+        SuspendClient.this.notify();
+      }
+    }
+  }
+
+  /**
+   * Receive notification that there was an exception thrown from the job driver.
+   */
+  final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
+    @Override
+    public void onNext(final FailedRuntime error) {
+      LOG.log(Level.SEVERE, "ERROR: " + error, error.getReason().orElse(null));
+      synchronized (SuspendClient.class) {
+        SuspendClient.this.notify();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
new file mode 100644
index 0000000..accb5a8
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendClientControl.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.client.RunningJob;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * (Wake) listener to get suspend/resume commands from Control process.
+ */
+public class SuspendClientControl implements AutoCloseable {
+
+  private static final Logger LOG = Logger.getLogger(Control.class.getName());
+  private static final ObjectSerializableCodec<byte[]> CODEC = new ObjectSerializableCodec<>();
+  private final transient Transport transport;
+  private transient RunningJob runningJob;
+
+  @Inject
+  public SuspendClientControl(
+      final @Parameter(SuspendClientControl.Port.class) int port) throws IOException {
+
+    LOG.log(Level.INFO, "Listen to control port {0}", port);
+
+    final EStage<TransportEvent> stage = new ThreadPoolStage<>(
+        "suspend-control-server", new ControlMessageHandler(), 1, new EventHandler<Throwable>() {
+      @Override
+      public void onNext(final Throwable throwable) {
+        throw new RuntimeException(throwable);
+      }
+    });
+
+    this.transport = new NettyMessagingTransport("localhost", port, stage, stage, 1, 10000);
+  }
+
+  public synchronized void setRunningJob(final RunningJob job) {
+    this.runningJob = job;
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.transport.close();
+  }
+
+  @NamedParameter(doc = "Port for suspend/resume control commands",
+      short_name = "port", default_value = "7008")
+  public static final class Port implements Name<Integer> {
+  }
+
+  /**
+   * Forward remote message to the job driver.
+   */
+  private class ControlMessageHandler implements EventHandler<TransportEvent> {
+    @Override
+    public synchronized void onNext(final TransportEvent msg) {
+      LOG.log(Level.INFO, "Control message: {0} destination: {1}",
+          new Object[]{CODEC.decode(msg.getData()), runningJob});
+      if (runningJob != null) {
+        runningJob.send(msg.getData());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendDriver.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendDriver.java
new file mode 100644
index 0000000..38d7542
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendDriver.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.checkpoint.fs.FSCheckPointServiceConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import javax.xml.bind.DatatypeConverter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Suspend/resume example job driver. Execute a simple task in all evaluators,
+ * and sendEvaluatorControlMessage suspend/resume events properly.
+ */
+@Unit
+public class SuspendDriver {
+
+  /**
+   * Standard Java logger.
+   */
+  private static final Logger LOG = Logger.getLogger(SuspendDriver.class.getName());
+
+  /**
+   * Number of evaluators to request
+   */
+  private static final int NUM_EVALUATORS = 2;
+
+  /**
+   * String codec is used to encode the results driver sends to the client.
+   */
+  private static final ObjectSerializableCodec<String> CODEC_STR = new ObjectSerializableCodec<>();
+
+  /**
+   * Integer codec is used to decode the results driver gets from the tasks.
+   */
+  private static final ObjectSerializableCodec<Integer> CODEC_INT = new ObjectSerializableCodec<>();
+
+  /**
+   * Job observer on the client.
+   * We use it to send results from the driver back to the client.
+   */
+  private final JobMessageObserver jobMessageObserver;
+
+  /**
+   * Job driver uses EvaluatorRequestor to request Evaluators that will run the Tasks.
+   */
+  private final EvaluatorRequestor evaluatorRequestor;
+
+  /**
+   * TANG Configuration of the Task.
+   */
+  private final Configuration contextConfig;
+
+  /**
+   * Map from task ID (a string) to the TaskRuntime instance (that can be suspended).
+   */
+  private final Map<String, RunningTask> runningTasks =
+      Collections.synchronizedMap(new HashMap<String, RunningTask>());
+
+  /**
+   * Map from task ID (a string) to the SuspendedTask instance (that can be resumed).
+   */
+  private final Map<String, SuspendedTask> suspendedTasks = new HashMap<>();
+
+  /**
+   * Job driver constructor.
+   * All parameters are injected from TANG automatically.
+   *
+   * @param evaluatorRequestor is used to request Evaluators.
+   * @param numCycles          number of cycles to run in the task.
+   * @param delay              delay in seconds between cycles in the task.
+   */
+  @Inject
+  SuspendDriver(
+      final JobMessageObserver jobMessageObserver,
+      final EvaluatorRequestor evaluatorRequestor,
+      final @Parameter(Launch.Local.class) boolean isLocal,
+      final @Parameter(Launch.NumCycles.class) int numCycles,
+      final @Parameter(Launch.Delay.class) int delay) {
+
+    this.jobMessageObserver = jobMessageObserver;
+    this.evaluatorRequestor = evaluatorRequestor;
+
+    try {
+
+      final Configuration checkpointServiceConfig = FSCheckPointServiceConfiguration.CONF
+          .set(FSCheckPointServiceConfiguration.IS_LOCAL, Boolean.toString(isLocal))
+          .set(FSCheckPointServiceConfiguration.PATH, "/tmp")
+          .set(FSCheckPointServiceConfiguration.PREFIX, "reef-checkpoint-")
+          .set(FSCheckPointServiceConfiguration.REPLICATION_FACTOR, "3")
+          .build();
+
+      final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder()
+          .bindNamedParameter(Launch.NumCycles.class, Integer.toString(numCycles))
+          .bindNamedParameter(Launch.Delay.class, Integer.toString(delay));
+
+      cb.addConfiguration(checkpointServiceConfig);
+      this.contextConfig = cb.build();
+
+    } catch (final BindException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * Receive notification that the Task is ready to run.
+   */
+  final class RunningTaskHandler implements EventHandler<RunningTask> {
+    @Override
+    public final void onNext(final RunningTask task) {
+      LOG.log(Level.INFO, "Running task: {0}", task.getId());
+      runningTasks.put(task.getId(), task);
+      jobMessageObserver.sendMessageToClient(CODEC_STR.encode("start task: " + task.getId()));
+    }
+  }
+
+  /**
+   * Receive notification that the Task has completed successfully.
+   */
+  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public final void onNext(final CompletedTask task) {
+
+      final EvaluatorDescriptor e = task.getActiveContext().getEvaluatorDescriptor();
+      final String msg = "Task completed " + task.getId() + " on node " + e;
+      LOG.info(msg);
+
+      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
+      runningTasks.remove(task.getId());
+      task.getActiveContext().close();
+
+      final boolean noTasks;
+
+      synchronized (suspendedTasks) {
+        LOG.log(Level.INFO, "Tasks running: {0} suspended: {1}", new Object[]{
+            runningTasks.size(), suspendedTasks.size()});
+        noTasks = runningTasks.isEmpty() && suspendedTasks.isEmpty();
+      }
+
+      if (noTasks) {
+        LOG.info("All tasks completed; shutting down.");
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Task has been suspended.
+   */
+  final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
+    @Override
+    public final void onNext(final SuspendedTask task) {
+
+      final String msg = "Task suspended: " + task.getId();
+      LOG.info(msg);
+
+      synchronized (suspendedTasks) {
+        suspendedTasks.put(task.getId(), task);
+        runningTasks.remove(task.getId());
+      }
+
+      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
+    }
+  }
+
+  /**
+   * Receive message from the Task.
+   */
+  final class TaskMessageHandler implements EventHandler<TaskMessage> {
+    @Override
+    public void onNext(final TaskMessage message) {
+      final int result = CODEC_INT.decode(message.get());
+      final String msg = "Task message " + message.getId() + ": " + result;
+      LOG.info(msg);
+      jobMessageObserver.sendMessageToClient(CODEC_STR.encode(msg));
+    }
+  }
+
+  /**
+   * Receive notification that an Evaluator had been allocated,
+   * and submitTask a new Task in that Evaluator.
+   */
+  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator eval) {
+      try {
+
+        LOG.log(Level.INFO, "Allocated Evaluator: {0}", eval.getId());
+
+        final Configuration thisContextConfiguration = ContextConfiguration.CONF.set(
+            ContextConfiguration.IDENTIFIER, eval.getId() + "_context").build();
+
+        eval.submitContext(Tang.Factory.getTang()
+            .newConfigurationBuilder(thisContextConfiguration, contextConfig).build());
+
+      } catch (final BindException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  /**
+   * Receive notification that a new Context is available.
+   * Submit a new Task to that Context.
+   */
+  final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public synchronized void onNext(final ActiveContext context) {
+      LOG.log(Level.INFO, "Active Context: {0}", context.getId());
+      try {
+        context.submitTask(TaskConfiguration.CONF
+            .set(TaskConfiguration.IDENTIFIER, context.getId() + "_task")
+            .set(TaskConfiguration.TASK, SuspendTestTask.class)
+            .set(TaskConfiguration.ON_SUSPEND, SuspendTestTask.SuspendHandler.class)
+            .build());
+      } catch (final BindException ex) {
+        LOG.log(Level.SEVERE, "Bad Task configuration for context: " + context.getId(), ex);
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  /**
+   * Handle notifications from the client.
+   */
+  final class ClientMessageHandler implements EventHandler<byte[]> {
+    @Override
+    public void onNext(final byte[] message) {
+
+      final String commandStr = CODEC_STR.decode(message);
+      LOG.log(Level.INFO, "Client message: {0}", commandStr);
+
+      final String[] split = commandStr.split("\\s+", 2);
+      if (split.length != 2) {
+        throw new IllegalArgumentException("Bad command: " + commandStr);
+      } else {
+
+        final String command = split[0].toLowerCase().intern();
+        final String taskId = split[1];
+
+        switch (command) {
+
+          case "suspend": {
+            final RunningTask task = runningTasks.get(taskId);
+            if (task != null) {
+              task.suspend();
+            } else {
+              throw new IllegalArgumentException("Suspend: Task not found: " + taskId);
+            }
+            break;
+          }
+
+          case "resume": {
+            final SuspendedTask suspendedTask;
+            synchronized (suspendedTasks) {
+              suspendedTask = suspendedTasks.remove(taskId);
+            }
+            if (suspendedTask != null) {
+              try {
+                suspendedTask.getActiveContext().submitTask(TaskConfiguration.CONF
+                    .set(TaskConfiguration.IDENTIFIER, taskId)
+                    .set(TaskConfiguration.MEMENTO,
+                        DatatypeConverter.printBase64Binary(suspendedTask.get()))
+                    .build());
+              } catch (final BindException e) {
+                throw new RuntimeException(e);
+              }
+            } else {
+              throw new IllegalArgumentException("Resume: Task not found: " + taskId);
+            }
+            break;
+          }
+
+          default:
+            throw new IllegalArgumentException("Bad command: " + command);
+        }
+      }
+    }
+  }
+
+  /**
+   * Job Driver is ready and the clock is set up: request the evaluators.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime time) {
+      LOG.log(Level.INFO, "StartTime: {0}", time);
+      evaluatorRequestor.submit(EvaluatorRequest.newBuilder()
+          .setMemory(128).setNumberOfCores(1).setNumber(NUM_EVALUATORS).build());
+    }
+  }
+
+  /**
+   * Shutting down the job driver: close the evaluators.
+   */
+  final class StopHandler implements EventHandler<StopTime> {
+    @Override
+    public void onNext(final StopTime time) {
+      LOG.log(Level.INFO, "StopTime: {0}", time);
+      jobMessageObserver.sendMessageToClient(CODEC_STR.encode("got StopTime"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendTestTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendTestTask.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendTestTask.java
new file mode 100644
index 0000000..5eedf1f
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/SuspendTestTask.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.reef.io.checkpoint.CheckpointID;
+import org.apache.reef.io.checkpoint.CheckpointService;
+import org.apache.reef.io.checkpoint.CheckpointService.CheckpointReadChannel;
+import org.apache.reef.io.checkpoint.CheckpointService.CheckpointWriteChannel;
+import org.apache.reef.io.checkpoint.fs.FSCheckpointID;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.TaskMessage;
+import org.apache.reef.task.TaskMessageSource;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Simple do-nothing task that can send messages to the Driver and can be suspended/resumed.
+ */
+@Unit
+public class SuspendTestTask implements Task, TaskMessageSource {
+
+  /**
+   * Standard java logger.
+   */
+  private static final Logger LOG = Logger.getLogger(SuspendTestTask.class.getName());
+  private final CheckpointService checkpointService;
+  /**
+   * number of cycles to run in the task.
+   */
+  private final int numCycles;
+  /**
+   * delay in milliseconds between cycles in the task.
+   */
+  private final int delay;
+  /**
+   * Codec to serialize/deserialize counter values for the updates.
+   */
+  private final ObjectSerializableCodec<Integer> codecInt = new ObjectSerializableCodec<>();
+  /**
+   * Codec to serialize/deserialize checkpoint IDs for suspend/resume.
+   */
+  private final ObjectWritableCodec<CheckpointID> codecCheckpoint =
+      new ObjectWritableCodec<CheckpointID>(FSCheckpointID.class);
+  /**
+   * Current value of the counter.
+   */
+  private int counter = 0;
+  /**
+   * True if the suspend message has been received, false otherwise.
+   */
+  private boolean suspended = false;
+
+  /**
+   * Task constructor: invoked by TANG.
+   *
+   * @param numCycles number of cycles to run in the task.
+   * @param delay     delay in seconds between cycles in the task.
+   */
+  @Inject
+  public SuspendTestTask(
+      final CheckpointService checkpointService,
+      @Parameter(Launch.NumCycles.class) final int numCycles,
+      @Parameter(Launch.Delay.class) final int delay) {
+    this.checkpointService = checkpointService;
+    this.numCycles = numCycles;
+    this.delay = delay * 1000;
+  }
+
+  /**
+   * Main method of the task: run cycle from 0 to numCycles,
+   * and sleep for delay seconds on each cycle.
+   *
+   * @param memento serialized version of the counter.
+   *                Empty array for initial run, but can contain value for resumed job.
+   * @return serialized version of the counter.
+   */
+  @Override
+  public synchronized byte[] call(final byte[] memento) throws IOException, InterruptedException {
+
+    LOG.log(Level.INFO, "Start: {0} counter: {1}/{2}",
+        new Object[]{this, this.counter, this.numCycles});
+
+    if (memento != null && memento.length > 0) {
+      this.restore(memento);
+    }
+
+    this.suspended = false;
+    for (; this.counter < this.numCycles && !this.suspended; ++this.counter) {
+      try {
+        LOG.log(Level.INFO, "Run: {0} counter: {1}/{2} sleep: {3}",
+            new Object[]{this, this.counter, this.numCycles, this.delay});
+        this.wait(this.delay);
+      } catch (final InterruptedException ex) {
+        LOG.log(Level.INFO, "{0} interrupted. counter: {1}: {2}",
+            new Object[]{this, this.counter, ex});
+      }
+    }
+
+    return this.suspended ? this.save() : this.codecInt.encode(this.counter);
+  }
+
+  /**
+   * Update driver on current state of the task.
+   *
+   * @return serialized version of the counter.
+   */
+  @Override
+  public synchronized Optional<TaskMessage> getMessage() {
+    LOG.log(Level.INFO, "Message from Task {0} to the Driver: counter: {1}",
+        new Object[]{this, this.counter});
+    return Optional.of(TaskMessage.from(SuspendTestTask.class.getName(), this.codecInt.encode(this.counter)));
+  }
+
+  /**
+   * Save current state of the task in the checkpoint.
+   *
+   * @return checkpoint ID (serialized)
+   */
+  private synchronized byte[] save() throws IOException, InterruptedException {
+    try (final CheckpointWriteChannel channel = this.checkpointService.create()) {
+      channel.write(ByteBuffer.wrap(this.codecInt.encode(this.counter)));
+      return this.codecCheckpoint.encode(this.checkpointService.commit(channel));
+    }
+  }
+
+  /**
+   * Restore the task state from the given checkpoint.
+   *
+   * @param memento serialized checkpoint ID
+   */
+  private synchronized void restore(final byte[] memento) throws IOException, InterruptedException {
+    final CheckpointID checkpointId = this.codecCheckpoint.decode(memento);
+    try (final CheckpointReadChannel channel = this.checkpointService.open(checkpointId)) {
+      final ByteBuffer buffer = ByteBuffer.wrap(this.codecInt.encode(this.counter));
+      channel.read(buffer);
+      this.counter = this.codecInt.decode(buffer.array());
+    }
+    this.checkpointService.delete(checkpointId);
+  }
+
+  public class SuspendHandler implements EventHandler<SuspendEvent> {
+
+    @Override
+    public void onNext(SuspendEvent suspendEvent) {
+      final byte[] message = suspendEvent.get().get();
+      LOG.log(Level.INFO, "Suspend: {0} with: {1} bytes; counter: {2}",
+          new Object[]{this, message.length, SuspendTestTask.this.counter});
+      SuspendTestTask.this.suspended = true;
+      SuspendTestTask.this.notify();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/package-info.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/package-info.java
new file mode 100644
index 0000000..9017daa
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/suspend/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * suspend/resume demo.
+ */
+package org.apache.reef.examples.suspend;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
new file mode 100644
index 0000000..e6a5d6c
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/BlockingEventHandler.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.utils.wake;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An EventHandler that blocks until a set number of Events has been received.
+ * Once they have been received, the downstream event handler is called with an
+ * Iterable of the events spooled.
+ *
+ * @param <T>
+ */
+public final class BlockingEventHandler<T> implements EventHandler<T> {
+
+  private final int expectedSize;
+  private final EventHandler<Iterable<T>> destination;
+  private List<T> events = new ArrayList<>();
+
+  public BlockingEventHandler(final int expectedSize, final EventHandler<Iterable<T>> destination) {
+    this.expectedSize = expectedSize;
+    this.destination = destination;
+  }
+
+  @Override
+  public final void onNext(final T event) {
+    if (this.isComplete()) {
+      throw new IllegalStateException("Received more Events than expected");
+    }
+    this.events.add(event);
+    if (this.isComplete()) {
+      this.destination.onNext(events);
+      this.reset();
+    }
+  }
+
+  private boolean isComplete() {
+    return this.events.size() >= expectedSize;
+  }
+
+  private void reset() {
+    this.events = new ArrayList<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
new file mode 100644
index 0000000..9d2d5b8
--- /dev/null
+++ b/lang/java/reef-examples/src/main/java/org/apache/reef/examples/utils/wake/LoggingEventHandler.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.utils.wake;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An EventHandler that logs its events before handing it to a downstream
+ * EventHandler.
+ *
+ * @param <T>
+ */
+public class LoggingEventHandler<T> implements EventHandler<T> {
+
+  private final EventHandler<T> downstreamEventHandler;
+  private final String prefix;
+  private final String suffix;
+
+  /**
+   * @param prefix                 to be logged before the event
+   * @param downstreamEventHandler the event handler to hand the event to
+   * @param suffix                 to be logged after the event
+   */
+  public LoggingEventHandler(final String prefix, EventHandler<T> downstreamEventHandler, final String suffix) {
+    this.downstreamEventHandler = downstreamEventHandler;
+    this.prefix = prefix;
+    this.suffix = suffix;
+  }
+
+  public LoggingEventHandler(final EventHandler<T> downstreamEventHandler) {
+    this("", downstreamEventHandler, "");
+  }
+
+  @Override
+  public void onNext(final T value) {
+    Logger.getLogger(LoggingEventHandler.class.getName()).log(Level.INFO, prefix + value.toString() + suffix);
+    this.downstreamEventHandler.onNext(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/test/java/org/apache/reef/examples/hello/HelloHttpTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/test/java/org/apache/reef/examples/hello/HelloHttpTest.java b/lang/java/reef-examples/src/test/java/org/apache/reef/examples/hello/HelloHttpTest.java
new file mode 100644
index 0000000..14812ca
--- /dev/null
+++ b/lang/java/reef-examples/src/test/java/org/apache/reef/examples/hello/HelloHttpTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.hello;
+
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.examples.hellohttp.HelloREEFHttp;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HelloHttpTest {
+  @Test
+  public void testHttpServer() throws BindException, InjectionException {
+
+    final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF
+        .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, 2)
+        .build();
+
+    final LauncherStatus status = HelloREEFHttp.runHelloReef(runtimeConfiguration, 10 * 1000);
+    Assert.assertEquals(LauncherStatus.FORCE_CLOSED, status); // must be force closed by timeout
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-examples/src/test/java/org/apache/reef/examples/suspend/ObjectWritableCodecTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-examples/src/test/java/org/apache/reef/examples/suspend/ObjectWritableCodecTest.java b/lang/java/reef-examples/src/test/java/org/apache/reef/examples/suspend/ObjectWritableCodecTest.java
new file mode 100644
index 0000000..827d57a
--- /dev/null
+++ b/lang/java/reef-examples/src/test/java/org/apache/reef/examples/suspend/ObjectWritableCodecTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.examples.suspend;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.io.checkpoint.CheckpointID;
+import org.apache.reef.io.checkpoint.fs.FSCheckpointID;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ObjectWritableCodecTest {
+
+  private static ObjectWritableCodec<CheckpointID> codec;
+
+  /**
+   * Test class setup - create the codec.
+   */
+  @BeforeClass
+  public static void setUpClass() {
+    codec = new ObjectWritableCodec<CheckpointID>(FSCheckpointID.class);
+  }
+
+  /**
+   * After the encode/decode cycle result equals to the original object.
+   */
+  @Test
+  public void testFSCheckpointIdCodec() {
+    final CheckpointID checkpoint1 = new FSCheckpointID(new Path("path"));
+    final byte[] serialized = codec.encode(checkpoint1);
+    final CheckpointID checkpoint2 = codec.decode(serialized);
+    Assert.assertEquals(checkpoint1, checkpoint2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/pom.xml b/lang/java/reef-io/pom.xml
new file mode 100644
index 0000000..c517e38
--- /dev/null
+++ b/lang/java/reef-io/pom.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.reef</groupId>
+        <artifactId>reef-project</artifactId>
+        <version>0.11.0-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>reef-io</artifactId>
+    <name>REEF IO</name>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>generate-sources</id>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            <tasks>
+                                <mkdir dir="target/generated-sources/proto"/>
+                                <exec executable="protoc">
+                                    <arg value="--proto_path=src/main/proto/"/>
+                                    <arg value="--java_out=target/generated-sources/proto"/>
+                                    <arg value="src/main/proto/ns_protocol.proto"/>
+                                </exec>
+                            </tasks>
+                            <sourceRoot>target/generated-sources/proto</sourceRoot>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/proto</source>
+                                <source>target/generated-sources/avro</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.avro</groupId>
+                <artifactId>avro-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-webserver</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <!-- HADOOP -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn</artifactId>
+            <version>${hadoop.version}</version>
+            <type>pom</type>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <!-- END OF HADOOP -->
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/avro/nameservice.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/avro/nameservice.avsc b/lang/java/reef-io/src/main/avro/nameservice.avsc
new file mode 100644
index 0000000..9a10478
--- /dev/null
+++ b/lang/java/reef-io/src/main/avro/nameservice.avsc
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ [
+{
+    "namespace":"org.apache.reef.io.network.naming.avro",
+    "type":"record",
+    "name":"AvroNamingLookupRequest",
+    "fields":[
+	   {"name":"ids","type":{"type":"array", "items":"string"}}
+    ]
+},
+{
+    "namespace":"org.apache.reef.io.network.naming.avro",
+    "type":"record",
+    "name":"AvroNamingAssignment",
+    "fields":[
+	   {"name":"id","type":"string"},
+	   {"name":"host","type":"string"},
+     {"name":"port","type":"int"}
+    ]
+},
+{
+    "namespace":"org.apache.reef.io.network.naming.avro",
+    "type":"record",
+    "name":"AvroNamingLookupResponse",
+    "fields":[
+	   {"name":"tuples","type":{"type":"array", "items":"AvroNamingAssignment"}}
+    ]
+},
+{
+    "namespace":"org.apache.reef.io.network.naming.avro",
+    "type":"record",
+    "name":"AvroNamingRegisterRequest",
+    "fields":[
+	   {"name":"id","type":"string"},
+	   {"name":"host","type":"string"},
+     {"name":"port","type":"int"}
+    ]
+},
+{
+    "namespace":"org.apache.reef.io.network.naming.avro",
+    "type":"record",
+    "name":"AvroNamingUnRegisterRequest",
+    "fields":[
+	   {"name":"id","type":"string"}
+    ]
+}
+]

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
new file mode 100644
index 0000000..632a349
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer;
+import org.apache.reef.io.network.util.Pair;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SingleThreadStage;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The driver component for the DataLoadingService
+ * Also acts as the central point for resource requests
+ * All the allocated evaluators pass through this and
+ * the ones that need data loading have a context stacked
+ * that enables a task to get access to Data via the
+ * {@link DataSet}.
+ * <p/>
+ * TODO: Add timeouts
+ */
+@DriverSide
+@Unit
+public class DataLoader {
+
+  private static final Logger LOG = Logger.getLogger(DataLoader.class.getName());
+
+  private final ConcurrentMap<String, Pair<Configuration, Configuration>> submittedDataEvalConfigs = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, Configuration> submittedComputeEvalConfigs = new ConcurrentHashMap<>();
+  private final BlockingQueue<Configuration> failedComputeEvalConfigs = new LinkedBlockingQueue<>();
+  private final BlockingQueue<Pair<Configuration, Configuration>> failedDataEvalConfigs = new LinkedBlockingQueue<>();
+
+  private final AtomicInteger numComputeRequestsToSubmit = new AtomicInteger(0);
+
+  private final DataLoadingService dataLoadingService;
+  private final int dataEvalMemoryMB;
+  private final int dataEvalCore;
+  private final EvaluatorRequest computeRequest;
+  private final SingleThreadStage<EvaluatorRequest> resourceRequestStage;
+  private final ResourceRequestHandler resourceRequestHandler;
+  private final int computeEvalMemoryMB;
+  private final int computeEvalCore;
+  private final EvaluatorRequestor requestor;
+
+  @Inject
+  public DataLoader(
+      final Clock clock,
+      final EvaluatorRequestor requestor,
+      final DataLoadingService dataLoadingService,
+      final @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorMemoryMB.class) int dataEvalMemoryMB,
+      final @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorNumberOfCores.class) int dataEvalCore,
+      final @Parameter(DataLoadingRequestBuilder.DataLoadingComputeRequest.class) String serializedComputeRequest) {
+
+    // FIXME: Issue #855: We need this alarm to look busy for REEF.
+    clock.scheduleAlarm(30000, new EventHandler<Alarm>() {
+      @Override
+      public void onNext(final Alarm time) {
+        LOG.log(Level.FINE, "Received Alarm: {0}", time);
+      }
+    });
+
+    this.requestor = requestor;
+    this.dataLoadingService = dataLoadingService;
+    this.dataEvalMemoryMB = dataEvalMemoryMB;
+    this.dataEvalCore = dataEvalCore;
+    this.resourceRequestHandler = new ResourceRequestHandler(requestor);
+    this.resourceRequestStage = new SingleThreadStage<>(this.resourceRequestHandler, 2);
+
+    if (serializedComputeRequest.equals("NULL")) {
+      this.computeRequest = null;
+      this.computeEvalMemoryMB = -1;
+      computeEvalCore = 1;
+    } else {
+      this.computeRequest = EvaluatorRequestSerializer.deserialize(serializedComputeRequest);
+      this.computeEvalMemoryMB = this.computeRequest.getMegaBytes();
+      this.computeEvalCore = this.computeRequest.getNumberOfCores();
+      this.numComputeRequestsToSubmit.set(this.computeRequest.getNumber());
+
+      this.resourceRequestStage.onNext(this.computeRequest);
+    }
+
+    this.resourceRequestStage.onNext(getDataLoadingRequest());
+  }
+
+  private EvaluatorRequest getDataLoadingRequest() {
+    return EvaluatorRequest.newBuilder()
+        .setNumber(this.dataLoadingService.getNumberOfPartitions())
+        .setMemory(this.dataEvalMemoryMB)
+        .setNumberOfCores(this.dataEvalCore)
+        .build();
+  }
+
+  public class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      LOG.log(Level.INFO, "StartTime: {0}", startTime);
+      resourceRequestHandler.releaseResourceRequestGate();
+    }
+  }
+
+  public class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+
+      final String evalId = allocatedEvaluator.getId();
+      LOG.log(Level.FINEST, "Allocated evaluator: {0}", evalId);
+
+      if (!failedComputeEvalConfigs.isEmpty()) {
+        LOG.log(Level.FINE, "Failed Compute requests need to be satisfied for {0}", evalId);
+        final Configuration conf = failedComputeEvalConfigs.poll();
+        if (conf != null) {
+          LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId);
+          allocatedEvaluator.submitContext(conf);
+          submittedComputeEvalConfigs.put(evalId, conf);
+          return;
+        }
+      }
+
+      if (!failedDataEvalConfigs.isEmpty()) {
+        LOG.log(Level.FINE, "Failed Data requests need to be satisfied for {0}", evalId);
+        final Pair<Configuration, Configuration> confPair = failedDataEvalConfigs.poll();
+        if (confPair != null) {
+          LOG.log(Level.FINE, "Satisfying failed configuration for {0}", evalId);
+          allocatedEvaluator.submitContextAndService(confPair.first, confPair.second);
+          submittedDataEvalConfigs.put(evalId, confPair);
+          return;
+        }
+      }
+
+      final int evaluatorsForComputeRequest = numComputeRequestsToSubmit.decrementAndGet();
+      LOG.log(Level.FINE, "Evaluators for compute request: {0}", evaluatorsForComputeRequest);
+
+      if (evaluatorsForComputeRequest >= 0) {
+        try {
+          final Configuration idConfiguration = ContextConfiguration.CONF
+              .set(ContextConfiguration.IDENTIFIER,
+                  dataLoadingService.getComputeContextIdPrefix() + evaluatorsForComputeRequest)
+              .build();
+          LOG.log(Level.FINE, "Submitting Compute Context to {0}", evalId);
+          allocatedEvaluator.submitContext(idConfiguration);
+          submittedComputeEvalConfigs.put(allocatedEvaluator.getId(), idConfiguration);
+          if (evaluatorsForComputeRequest == 0) {
+            LOG.log(Level.FINE, "All Compute requests satisfied. Releasing gate");
+            resourceRequestHandler.releaseResourceRequestGate();
+          }
+        } catch (final BindException e) {
+          throw new RuntimeException("Unable to bind context id for Compute request", e);
+        }
+
+      } else {
+
+        final Pair<Configuration, Configuration> confPair = new Pair<>(
+            dataLoadingService.getContextConfiguration(allocatedEvaluator),
+            dataLoadingService.getServiceConfiguration(allocatedEvaluator));
+
+        LOG.log(Level.FINE, "Submitting data loading context to {0}", evalId);
+        allocatedEvaluator.submitContextAndService(confPair.first, confPair.second);
+        submittedDataEvalConfigs.put(allocatedEvaluator.getId(), confPair);
+      }
+    }
+  }
+
+  public class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+    @Override
+    public void onNext(final FailedEvaluator failedEvaluator) {
+
+      final String evalId = failedEvaluator.getId();
+
+      final Configuration computeConfig = submittedComputeEvalConfigs.remove(evalId);
+      if (computeConfig != null) {
+
+        LOG.log(Level.INFO, "Received failed compute evaluator: {0}", evalId);
+        failedComputeEvalConfigs.add(computeConfig);
+
+        requestor.submit(EvaluatorRequest.newBuilder()
+            .setMemory(computeEvalMemoryMB).setNumber(1).setNumberOfCores(computeEvalCore).build());
+
+      } else {
+
+        final Pair<Configuration, Configuration> confPair = submittedDataEvalConfigs.remove(evalId);
+        if (confPair != null) {
+
+          LOG.log(Level.INFO, "Received failed data evaluator: {0}", evalId);
+          failedDataEvalConfigs.add(confPair);
+
+          requestor.submit(EvaluatorRequest.newBuilder()
+              .setMemory(dataEvalMemoryMB).setNumber(1).setNumberOfCores(dataEvalCore).build());
+
+        } else {
+
+          LOG.log(Level.SEVERE, "Received unknown failed evaluator " + evalId,
+              failedEvaluator.getEvaluatorException());
+
+          throw new RuntimeException("Received failed evaluator that I did not submit: " + evalId);
+        }
+      }
+    }
+  }
+}