You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:19 UTC

[46/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
new file mode 100644
index 0000000..b2e0083
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -0,0 +1,724 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.javabridge.*;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.util.Optional;
+import org.apache.reef.util.logging.CLRBufferedLogHandler;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+import org.apache.reef.webserver.*;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Generic job driver for CLRBridge.
+ */
+@Unit
+public final class JobDriver {
+
+  private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
+  /**
+   * String codec is used to encode the results
+   * before passing them back to the client.
+   */
+  private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>();
+  private final InteropLogger interopLogger = new InteropLogger();
+  private final NameServer nameServer;
+  private final String nameServerInfo;
+  private final HttpServer httpServer;
+  /**
+   * Wake clock is used to schedule periodical job check-ups.
+   */
+  private final Clock clock;
+  /**
+   * Job observer on the client.
+   * We use it to send results from the driver back to the client.
+   */
+  private final JobMessageObserver jobMessageObserver;
+  /**
+   * Job driver uses EvaluatorRequestor
+   * to request Evaluators that will run the Tasks.
+   */
+  private final EvaluatorRequestor evaluatorRequestor;
+
+  /**
+   * Driver status manager to monitor driver status
+   */
+  private final DriverStatusManager driverStatusManager;
+
+  /**
+   *  NativeInterop has function to load libs when driver starts
+   */
+  private final LibLoader libLoader;
+
+  /**
+   * Shell execution results from each Evaluator.
+   */
+  private final List<String> results = new ArrayList<>();
+  /**
+   * Map from context ID to running evaluator context.
+   */
+  private final Map<String, ActiveContext> contexts = new HashMap<>();
+
+  /**
+   * Logging scope factory that provides LoggingScope
+   */
+  private final LoggingScopeFactory loggingScopeFactory;
+
+  private long evaluatorRequestorHandler = 0;
+  private long allocatedEvaluatorHandler = 0;
+  private long activeContextHandler = 0;
+  private long taskMessageHandler = 0;
+  private long failedTaskHandler = 0;
+  private long failedEvaluatorHandler = 0;
+  private long httpServerEventHandler = 0;
+  private long completedTaskHandler = 0;
+  private long runningTaskHandler = 0;
+  private long suspendedTaskHandler = 0;
+  private long completedEvaluatorHandler = 0;
+  private long closedContextHandler = 0;
+  private long failedContextHandler = 0;
+  private long contextMessageHandler = 0;
+  private long driverRestartHandler = 0;
+  private long driverRestartActiveContextHandler = 0;
+  private long driverRestartRunningTaskHandler = 0;
+  private boolean clrBridgeSetup = false;
+  private boolean isRestarted = false;
+
+  /**
+   * Job driver constructor.
+   * All parameters are injected from TANG automatically.
+   *
+   * @param clock              Wake clock to schedule and check up running jobs.
+   * @param jobMessageObserver is used to send messages back to the client.
+   * @param evaluatorRequestor is used to request Evaluators.
+   */
+  @Inject
+  JobDriver(final Clock clock,
+            final HttpServer httpServer,
+            final NameServer nameServer,
+            final JobMessageObserver jobMessageObserver,
+            final EvaluatorRequestor evaluatorRequestor,
+            final DriverStatusManager driverStatusManager,
+            final LoggingScopeFactory loggingScopeFactory,
+            final LibLoader libLoader) {
+    this.clock = clock;
+    this.httpServer = httpServer;
+    this.jobMessageObserver = jobMessageObserver;
+    this.evaluatorRequestor = evaluatorRequestor;
+    this.nameServer = nameServer;
+    this.driverStatusManager = driverStatusManager;
+    this.nameServerInfo = NetUtils.getLocalAddress() + ":" + this.nameServer.getPort();
+    this.loggingScopeFactory = loggingScopeFactory;
+    this.libLoader = libLoader;
+  }
+
+  private void setupBridge(final StartTime startTime) {
+    // Signal to the clr buffered log handler that the driver has started and that
+    // we can begin logging
+    LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
+    try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) {
+
+      try {
+        libLoader.loadLib();
+      } catch (IOException e) {
+        throw new RuntimeException("Fail to load CLR libraries");
+      }
+
+      final CLRBufferedLogHandler handler = getCLRBufferedLogHandler();
+      if (handler == null) {
+        LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized");
+      } else {
+        handler.setDriverInitialized();
+        LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
+      }
+
+      LOG.log(Level.INFO, "StartTime: {0}", new Object[]{startTime});
+      String portNumber = httpServer == null ? null : Integer.toString((httpServer.getPort()));
+      long[] handlers = NativeInterop.CallClrSystemOnStartHandler(startTime.toString(), portNumber);
+      if (handlers != null) {
+        if (handlers.length != NativeInterop.nHandlers) {
+          throw new RuntimeException(
+              String.format("%s handlers initialized in CLR while native bridge is expecting %s handlers",
+                  String.valueOf(handlers.length),
+                  String.valueOf(NativeInterop.nHandlers)));
+        }
+        this.evaluatorRequestorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.EvaluatorRequestorKey)];
+        this.allocatedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.AllocatedEvaluatorKey)];
+        this.activeContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ActiveContextKey)];
+        this.taskMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.TaskMessageKey)];
+        this.failedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedTaskKey)];
+        this.failedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedEvaluatorKey)];
+        this.httpServerEventHandler = handlers[NativeInterop.Handlers.get(NativeInterop.HttpServerKey)];
+        this.completedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedTaskKey)];
+        this.runningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.RunningTaskKey)];
+        this.suspendedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.SuspendedTaskKey)];
+        this.completedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedEvaluatorKey)];
+        this.closedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ClosedContextKey)];
+        this.failedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedContextKey)];
+        this.contextMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ContextMessageKey)];
+        this.driverRestartHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartKey)];
+        this.driverRestartActiveContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartActiveContextKey)];
+        this.driverRestartRunningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartRunningTaskKey)];
+      }
+
+      try (final LoggingScope lp = this.loggingScopeFactory.getNewLoggingScope("setupBridge::ClrSystemHttpServerHandlerOnNext")) {
+        final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC");
+        NativeInterop.ClrSystemHttpServerHandlerOnNext(this.httpServerEventHandler, httpServerEventBridge, this.interopLogger);
+        final String specList = httpServerEventBridge.getUriSpecification();
+        LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList);
+        if (specList != null) {
+          final String[] specs = specList.split(":");
+          for (final String s : specs) {
+            final HttpHandler h = new HttpServerBridgeEventHandler();
+            h.setUriSpecification(s);
+            this.httpServer.addHttpHandler(h);
+          }
+        }
+      }
+      this.clrBridgeSetup = true;
+    }
+    LOG.log(Level.INFO, "CLR Bridge setup.");
+  }
+
+  private CLRBufferedLogHandler getCLRBufferedLogHandler() {
+    for (Handler handler : Logger.getLogger("").getHandlers()) {
+      if (handler instanceof CLRBufferedLogHandler)
+        return (CLRBufferedLogHandler) handler;
+    }
+    return null;
+  }
+
+  private void submitEvaluator(final AllocatedEvaluator eval, EvaluatorType type) {
+    synchronized (JobDriver.this) {
+      eval.setType(type);
+      LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}",
+          new Object[]{eval.getId(), JobDriver.this.contexts.size()});
+      if (JobDriver.this.allocatedEvaluatorHandler == 0) {
+        throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR.");
+      }
+      AllocatedEvaluatorBridge allocatedEvaluatorBridge = new AllocatedEvaluatorBridge(eval, JobDriver.this.nameServerInfo);
+      NativeInterop.ClrSystemAllocatedEvaluatorHandlerOnNext(JobDriver.this.allocatedEvaluatorHandler, allocatedEvaluatorBridge, this.interopLogger);
+    }
+  }
+
+  /**
+   * Submit a Task to a single Evaluator.
+   */
+  private void submit(final ActiveContext context) {
+    try {
+      LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context});
+      if (JobDriver.this.activeContextHandler == 0) {
+        throw new RuntimeException("Active Context Handler not initialized by CLR.");
+      }
+      ActiveContextBridge activeContextBridge = new ActiveContextBridge(context);
+      NativeInterop.ClrSystemActiveContextHandlerOnNext(JobDriver.this.activeContextHandler, activeContextBridge, JobDriver.this.interopLogger);
+    } catch (final Exception ex) {
+      LOG.log(Level.SEVERE, "Fail to submit task to active context");
+      context.close();
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * Handles AllocatedEvaluator: Submit an empty context
+   */
+  final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) {
+        synchronized (JobDriver.this) {
+          LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
+            JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.CLR);
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that a new Context is available.
+   */
+  final class ActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext context) {
+      try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) {
+        synchronized (JobDriver.this) {
+          LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}",
+              new Object[]{context.getId()});
+          JobDriver.this.contexts.put(context.getId(), context);
+          JobDriver.this.submit(context);
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Task has completed successfully.
+   */
+  final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+    @Override
+    public void onNext(final CompletedTask task) {
+      LOG.log(Level.INFO, "Completed task: {0}", task.getId());
+      try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) {
+        // Take the message returned by the task and add it to the running result.
+        String result = "default result";
+        try {
+          result = new String(task.get());
+        } catch (final Exception e) {
+          LOG.log(Level.WARNING, "failed to decode task outcome");
+        }
+        LOG.log(Level.INFO, "Return results to the client:\n{0}", result);
+        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result));
+        if (JobDriver.this.completedTaskHandler == 0) {
+          LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
+        } else {
+          LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
+          CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task);
+          NativeInterop.ClrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge, JobDriver.this.interopLogger);
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the entire Evaluator had failed.
+   */
+  final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+    @Override
+    public void onNext(final FailedEvaluator eval) {
+      try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) {
+        synchronized (JobDriver.this) {
+          LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+          for (final FailedContext failedContext : eval.getFailedContextList()) {
+            String failedContextId = failedContext.getId();
+            LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
+            JobDriver.this.contexts.remove(failedContextId);
+          }
+          String message = "Evaluator " + eval.getId() + " failed with message: "
+              + eval.getEvaluatorException().getMessage();
+          JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+
+          if (failedEvaluatorHandler == 0) {
+            if (JobDriver.this.clrBridgeSetup) {
+              message = "No CLR FailedEvaluator handler was set, exiting now";
+              LOG.log(Level.WARNING, message);
+              JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+              return;
+            } else {
+              clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+                @Override
+                public void onNext(final Alarm time) {
+                  if (JobDriver.this.clrBridgeSetup) {
+                    handleFailedEvaluatorInCLR(eval);
+                  } else {
+                    LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
+                    clock.scheduleAlarm(5000, this);
+                  }
+                }
+              });
+            }
+          } else {
+            handleFailedEvaluatorInCLR(eval);
+          }
+        }
+      }
+    }
+
+    private void handleFailedEvaluatorInCLR(final FailedEvaluator eval) {
+      final String message = "CLR FailedEvaluator handler set, handling things with CLR handler.";
+      LOG.log(Level.INFO, message);
+      FailedEvaluatorBridge failedEvaluatorBridge = new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, JobDriver.this.isRestarted, loggingScopeFactory);
+      NativeInterop.ClrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler, failedEvaluatorBridge, JobDriver.this.interopLogger);
+      int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
+      if (additionalRequestedEvaluatorNumber > 0) {
+        LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " + additionalRequestedEvaluatorNumber);
+      }
+      JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+    }
+  }
+
+  final class HttpServerBridgeEventHandler implements HttpHandler {
+    private String uriSpecification;
+
+    /**
+     * returns URI specification for the handler
+     */
+    @Override
+    public String getUriSpecification() {
+      return uriSpecification;
+    }
+
+    public void setUriSpecification(String s) {
+      uriSpecification = s;
+    }
+
+    /**
+     * process http request
+     */
+    @Override
+    public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) throws IOException, ServletException {
+      LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri());
+      try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) {
+        final AvroHttpSerializer httpSerializer = new AvroHttpSerializer();
+        final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest);
+        final byte[] requestBytes = httpSerializer.toBytes(avroHttpRequest);
+
+        try {
+          final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes);
+          NativeInterop.ClrSystemHttpServerHandlerOnNext(JobDriver.this.httpServerEventHandler, httpServerEventBridge, JobDriver.this.interopLogger);
+          final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8");
+          response.getWriter().println(responseBody);
+          LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody);
+        } catch (final Exception ex) {
+          LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex);
+          throw new RuntimeException(ex);
+        }
+      }
+    }
+  }
+
+  /**
+   * Handle failed task.
+   */
+  final class FailedTaskHandler implements EventHandler<FailedTask> {
+    @Override
+    public void onNext(final FailedTask task) throws RuntimeException {
+      LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set.");
+      if (JobDriver.this.failedTaskHandler == 0) {
+        LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real.");
+        throw new RuntimeException("Failed Task Handler not initialized by CLR.");
+      }
+      try {
+        FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task);
+        NativeInterop.ClrSystemFailedTaskHandlerOnNext(JobDriver.this.failedTaskHandler, failedTaskBridge, JobDriver.this.interopLogger);
+      } catch (final Exception ex) {
+        LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler");
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Task is running.
+   */
+  final class RunningTaskHandler implements EventHandler<RunningTask> {
+    @Override
+    public void onNext(final RunningTask task) {
+      try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) {
+        if (JobDriver.this.runningTaskHandler == 0) {
+          LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler.");
+        } else {
+          LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
+          try {
+            final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task);
+            NativeInterop.ClrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge, JobDriver.this.interopLogger);
+          } catch (final Exception ex) {
+            LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
+            throw new RuntimeException(ex);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Task is running when driver restarted.
+   */
+  final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> {
+    @Override
+    public void onNext(final RunningTask task) {
+      try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) {
+        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+          @Override
+          public void onNext(final Alarm time) {
+            if (JobDriver.this.clrBridgeSetup) {
+              if (JobDriver.this.driverRestartRunningTaskHandler != 0) {
+                LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
+                NativeInterop.ClrSystemDriverRestartRunningTaskHandlerOnNext(JobDriver.this.driverRestartRunningTaskHandler, new RunningTaskBridge(task));
+              } else {
+                LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, done with DriverRestartRunningTaskHandler.");
+              }
+            } else {
+              LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart RunningTaskHandler...");
+              clock.scheduleAlarm(2000, this);
+            }
+          }
+        });
+      }
+    }
+  }
+
+  /**
+   * Receive notification that an context is active on Evaluator when the driver restarted
+   */
+  final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> {
+    @Override
+    public void onNext(final ActiveContext context) {
+      try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) {
+        JobDriver.this.contexts.put(context.getId(), context);
+      LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId());
+        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+          @Override
+          public void onNext(final Alarm time) {
+            if (JobDriver.this.clrBridgeSetup) {
+              if (JobDriver.this.driverRestartActiveContextHandler != 0) {
+                LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
+                NativeInterop.ClrSystemDriverRestartActiveContextHandlerOnNext(JobDriver.this.driverRestartActiveContextHandler, new ActiveContextBridge(context));
+              } else {
+                LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, done with DriverRestartActiveContextHandler.");
+              }
+            } else {
+              LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart DriverRestartActiveContextHandler...");
+              clock.scheduleAlarm(2000, this);
+            }
+          }
+        });
+      }
+    }
+  }
+
+  /**
+   * Job Driver is ready and the clock is set up: request the evaluators.
+   */
+  final class StartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) {
+        synchronized (JobDriver.this) {
+
+          setupBridge(startTime);
+
+          LOG.log(Level.INFO, "Driver Started");
+
+          if (JobDriver.this.evaluatorRequestorHandler == 0) {
+            throw new RuntimeException("Evaluator Requestor Handler not initialized by CLR.");
+          }
+          EvaluatorRequestorBridge evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory);
+          NativeInterop.ClrSystemEvaluatorRequstorHandlerOnNext(JobDriver.this.evaluatorRequestorHandler, evaluatorRequestorBridge, JobDriver.this.interopLogger);
+          // get the evaluator numbers set by CLR handler
+          LOG.log(Level.INFO, "evaluator requested at start up: " + evaluatorRequestorBridge.getEvaluatorNumber());
+        }
+      }
+    }
+  }
+
+
+  /**
+   * Job driver is restarted after previous crash
+   */
+  final class RestartHandler implements EventHandler<StartTime> {
+    @Override
+    public void onNext(final StartTime startTime) {
+      try (final LoggingScope ls = loggingScopeFactory.driverRestart(startTime)) {
+        synchronized (JobDriver.this) {
+
+          setupBridge(startTime);
+
+          JobDriver.this.isRestarted = true;
+
+          LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that driver restart has completed.
+   */
+  final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> {
+    @Override
+    public void onNext(final DriverRestartCompleted driverRestartCompleted) {
+      LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", driverRestartCompleted.getTimeStamp());
+      try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(driverRestartCompleted.getTimeStamp())) {
+        if (JobDriver.this.driverRestartHandler != 0) {
+          LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
+          NativeInterop.ClrSystemDriverRestartHandlerOnNext(JobDriver.this.driverRestartHandler);
+        } else {
+          LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler.");
+
+        }
+      }
+    }
+  }
+
+  /**
+   * Shutting down the job driver: close the evaluators.
+   */
+  final class StopHandler implements EventHandler<StopTime> {
+    @Override
+    public void onNext(final StopTime time) {
+      LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time});
+      try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) {
+        for (final ActiveContext context : contexts.values()) {
+          context.close();
+        }
+      }
+    }
+  }
+
+  final class TaskMessageHandler implements EventHandler<TaskMessage> {
+    @Override
+    public void onNext(final TaskMessage taskMessage) {
+      String msg = new String(taskMessage.get());
+      LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg);
+      //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) {
+      if (JobDriver.this.taskMessageHandler != 0) {
+        TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage);
+        // if CLR implements the task message handler, handle the bytes in CLR handler
+        NativeInterop.ClrSystemTaskMessageHandlerOnNext(JobDriver.this.taskMessageHandler, taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger);
+      }
+      //}
+    }
+  }
+
+  /**
+   * Receive notification that the Task has been suspended.
+   */
+  final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
+    @Override
+    public final void onNext(final SuspendedTask task) {
+      final String message = "Received notification that task [" + task.getId() + "] has been suspended.";
+      LOG.log(Level.INFO, message);
+      try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
+        if (JobDriver.this.suspendedTaskHandler != 0) {
+          SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task);
+          // if CLR implements the suspended task handler, handle it in CLR
+          LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
+          NativeInterop.ClrSystemSupendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge);
+        }
+        JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message));
+      }
+    }
+  }
+
+  /**
+   * Receive notification that the Evaluator has been shut down.
+   */
+  final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
+    @Override
+    public void onNext(final CompletedEvaluator evaluator) {
+      LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId());
+      try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) {
+        if (JobDriver.this.completedEvaluatorHandler != 0) {
+          CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator);
+          // if CLR implements the completed evaluator handler, handle it in CLR
+          LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge.");
+          NativeInterop.ClrSystemCompletdEvaluatorHandlerOnNext(completedEvaluatorHandler, completedEvaluatorBridge);
+        }
+      }
+    }
+  }
+
+
+  /**
+   * Receive notification that the Context had completed.
+   * Remove context from the list of active context.
+   */
+  final class ClosedContextHandler implements EventHandler<ClosedContext> {
+    @Override
+    public void onNext(final ClosedContext context) {
+      LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
+      try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
+        if (JobDriver.this.closedContextHandler != 0) {
+          ClosedContextBridge closedContextBridge = new ClosedContextBridge(context);
+          // if CLR implements the closed context handler, handle it in CLR
+          LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
+          NativeInterop.ClrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge);
+        }
+        synchronized (JobDriver.this) {
+          JobDriver.this.contexts.remove(context.getId());
+        }
+      }
+    }
+  }
+
+
+  /**
+   * Receive notification that the Context had failed.
+   * Remove context from the list of active context and notify the client.
+   */
+  final class FailedContextHandler implements EventHandler<FailedContext> {
+    @Override
+    public void onNext(final FailedContext context) {
+      LOG.log(Level.SEVERE, "FailedContext", context);
+      try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
+        if (JobDriver.this.failedContextHandler != 0) {
+          FailedContextBridge failedContextBridge = new FailedContextBridge(context);
+          // if CLR implements the failed context handler, handle it in CLR
+          LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
+          NativeInterop.ClrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge);
+        }
+        synchronized (JobDriver.this) {
+          JobDriver.this.contexts.remove(context.getId());
+        }
+        Optional<byte[]> err = context.getData();
+        if (err.isPresent()) {
+          JobDriver.this.jobMessageObserver.sendMessageToClient(err.get());
+        }
+      }
+    }
+  }
+
+  /**
+   * Receive notification that a ContextMessage has been received
+   */
+  final class ContextMessageHandler implements EventHandler<ContextMessage> {
+    @Override
+    public void onNext(final ContextMessage message) {
+      LOG.log(Level.SEVERE, "Received ContextMessage:", message.get());
+      try (final LoggingScope ls = loggingScopeFactory.contextMessageReceived(message.get().toString())) {
+        if (JobDriver.this.contextMessageHandler != 0) {
+          ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message);
+          // if CLR implements the context message handler, handle it in CLR
+          LOG.log(Level.INFO, "Handling the event of context message in CLR bridge.");
+          NativeInterop.ClrSystemContextMessageHandlerOnNext(JobDriver.this.contextMessageHandler, contextMessageBridge);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
new file mode 100644
index 0000000..b1473ee
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.client.ClientConfiguration;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.util.logging.LoggingScopeImpl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Clr Bridge example - main class.
+ */
+public final class Launch {
+
+  /**
+   * Number of REEF worker threads in local mode. We assume maximum 10 evaluators can be requested on local runtime
+   */
+  private static final int NUM_LOCAL_THREADS = 10;
+  /**
+   * Standard Java logger
+   */
+  private static final Logger LOG = Logger.getLogger(Launch.class.getName());
+
+  /**
+   * This class should not be instantiated.
+   */
+  private Launch() {
+    throw new RuntimeException("Do not instantiate this class!");
+  }
+
+  /**
+   * Parse the command line arguments.
+   *
+   * @param args command line arguments, as passed to main()
+   * @return Configuration object.
+   * @throws org.apache.reef.tang.exceptions.BindException configuration error.
+   * @throws java.io.IOException                           error reading the configuration.
+   */
+  private static Configuration parseCommandLine(final String[] args)
+      throws BindException, IOException {
+    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+    final CommandLine cl = new CommandLine(confBuilder);
+    cl.registerShortNameOfClass(Local.class);
+    cl.registerShortNameOfClass(NumRuns.class);
+    cl.registerShortNameOfClass(WaitTimeForDriver.class);
+    cl.registerShortNameOfClass(DriverMemoryInMb.class);
+    cl.registerShortNameOfClass(DriverIdentifier.class);
+    cl.registerShortNameOfClass(DriverJobSubmissionDirectory.class);
+    cl.registerShortNameOfClass(Submit.class);
+    cl.processCommandLine(args);
+    return confBuilder.build();
+  }
+
+  private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
+      throws InjectionException, BindException {
+    final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+    final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+    cb.bindNamedParameter(NumRuns.class, String.valueOf(injector.getNamedInstance(NumRuns.class)));
+    return cb.build();
+  }
+
+  /**
+   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+   *
+   * @param args Command line arguments, as passed into main().
+   * @return (immutable) TANG Configuration object.
+   * @throws org.apache.reef.tang.exceptions.BindException      if configuration commandLineInjector fails.
+   * @throws org.apache.reef.tang.exceptions.InjectionException if configuration commandLineInjector fails.
+   * @throws java.io.IOException                                error reading the configuration.
+   */
+  private static Configuration getClientConfiguration(final String[] args)
+      throws BindException, InjectionException, IOException {
+
+    try (final LoggingScope ls = LoggingScopeFactory.getNewLoggingScope(Level.INFO, "Launch::getClientConfiguration")) {
+      final Configuration commandLineConf = parseCommandLine(args);
+
+      final Configuration clientConfiguration = ClientConfiguration.CONF
+          .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
+          .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
+          .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
+          //.set(ClientConfiguration.ON_WAKE_ERROR, JobClient.WakeErrorHandler.class )
+          .build();
+
+      // TODO: Remove the injector, have stuff injected.
+      final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
+      final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
+      final Configuration runtimeConfiguration;
+      if (isLocal) {
+        LOG.log(Level.INFO, "Running on the local runtime");
+        runtimeConfiguration = LocalRuntimeConfiguration.CONF
+            .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+            .build();
+      } else {
+        LOG.log(Level.INFO, "Running on YARN");
+        runtimeConfiguration = YarnClientConfiguration.CONF.build();
+      }
+
+      return Tang.Factory.getTang()
+          .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
+              cloneCommandLineConfiguration(commandLineConf))
+          .build();
+    }
+  }
+
+  /**
+   * Main method that starts the CLR Bridge from Java
+   *
+   * @param args command line parameters.
+   */
+  public static void main(final String[] args) {
+    LOG.log(Level.INFO, "Entering Launch at :::" + new Date());
+    try {
+      if (args == null || args.length == 0) {
+        throw new IllegalArgumentException("No arguments provided, at least a clrFolder should be supplied.");
+      }
+      final File dotNetFolder = new File(args[0]).getAbsoluteFile();
+      String[] removedArgs = Arrays.copyOfRange(args, 1, args.length);
+
+      final Configuration config = getClientConfiguration(removedArgs);
+      final Injector commandLineInjector = Tang.Factory.getTang().newInjector(parseCommandLine(removedArgs));
+      final int waitTime = commandLineInjector.getNamedInstance(WaitTimeForDriver.class);
+      final int driverMemory = commandLineInjector.getNamedInstance(DriverMemoryInMb.class);
+      final String driverIdentifier = commandLineInjector.getNamedInstance(DriverIdentifier.class);
+      final String jobSubmissionDirectory = commandLineInjector.getNamedInstance(DriverJobSubmissionDirectory.class);
+      final boolean submit = commandLineInjector.getNamedInstance(Submit.class);
+      final Injector injector = Tang.Factory.getTang().newInjector(config);
+      final JobClient client = injector.getInstance(JobClient.class);
+      client.setDriverInfo(driverIdentifier, driverMemory, jobSubmissionDirectory);
+
+      if (submit) {
+        client.submit(dotNetFolder, true, null);
+        client.waitForCompletion(waitTime);
+      } else {
+        client.submit(dotNetFolder, false, config);
+        client.waitForCompletion(0);
+      }
+
+
+      LOG.info("Done!");
+    } catch (final BindException | InjectionException | IOException ex) {
+      LOG.log(Level.SEVERE, "Job configuration error", ex);
+    }
+  }
+
+  /**
+   * Command line parameter: number of experiments to run.
+   */
+  @NamedParameter(doc = "Number of times to run the command",
+      short_name = "num_runs", default_value = "1")
+  public static final class NumRuns implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter = true to run locally, or false to run on YARN.
+   */
+  @NamedParameter(doc = "Whether or not to run on the local runtime",
+      short_name = "local", default_value = "true")
+  public static final class Local implements Name<Boolean> {
+  }
+
+  /**
+   * Command line parameter, number of seconds  to wait till driver finishes ,
+   * = -1 : waits forever
+   * = 0: exit immediately without wait for driver.
+   */
+  @NamedParameter(doc = "Whether or not to wait for driver to finish",
+      short_name = "wait_time", default_value = "-1")
+  public static final class WaitTimeForDriver implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter, driver memory, in MB
+   */
+  @NamedParameter(doc = "memory allocated to driver JVM",
+      short_name = "driver_memory", default_value = "512")
+  public static final class DriverMemoryInMb implements Name<Integer> {
+  }
+
+  /**
+   * Command line parameter, driver identifier
+   */
+  @NamedParameter(doc = "driver identifier for clr bridge",
+      short_name = "driver_id", default_value = "ReefClrBridge")
+  public static final class DriverIdentifier implements Name<String> {
+  }
+
+  /**
+   * Command line parameter = true to submit the job with driver config, or false to write config to current directory
+   */
+  @NamedParameter(doc = "Whether or not to submit the reef job after driver config is constructed",
+      short_name = "submit", default_value = "true")
+  public static final class Submit implements Name<Boolean> {
+  }
+
+  /**
+   * Command line parameter, job submission directory, if set, user should guarantee its uniqueness
+   */
+  @NamedParameter(doc = "driver job submission directory",
+      short_name = "submission_directory", default_value = "empty")
+  public static final class DriverJobSubmissionDirectory implements Name<String> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java
new file mode 100644
index 0000000..ba2a5cb
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.runtime.common.client.REEFImplementation;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationModule;
+
+import java.io.File;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Clr Bridge example - main class.
+ */
+public final class LaunchHeadless {
+
+  /**
+   * Standard Java logger
+   */
+  private static final Logger LOG = Logger.getLogger(LaunchHeadless.class.getName());
+
+  /**
+   * This class should not be instantiated.
+   */
+  private LaunchHeadless() {
+    throw new RuntimeException("Do not instantiate this class!");
+  }
+
+
+  /**
+   * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+   *
+   * @param args Command line arguments, as passed into main().
+   * @return (immutable) TANG Configuration object.
+   * @throws org.apache.reef.tang.exceptions.BindException      if configuration commandLineInjector fails.
+   * @throws org.apache.reef.tang.exceptions.InjectionException if configuration commandLineInjector fails.
+   * @throws java.io.IOException        error reading the configuration.
+   */
+
+  /**
+   * Main method that starts the CLR Bridge from Java
+   *
+   * @param args command line parameters.
+   */
+  public static void main(final String[] args) {
+    try {
+      if (args == null || args.length == 0) {
+        throw new IllegalArgumentException("No arguments provided, at least a clrFolder should be supplied.");
+      }
+      final File dotNetFolder = new File(args[0]).getAbsoluteFile();
+
+      ConfigurationModule driverConfigModule = JobClient.getDriverConfiguration();
+
+      ConfigurationModule result = driverConfigModule;
+      for (final File f : dotNetFolder.listFiles()) {
+        if (f.canRead() && f.exists() && f.isFile()) {
+          result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath());
+        }
+      }
+
+      driverConfigModule = result;
+      Configuration driverConfiguration = Configurations.merge(driverConfigModule.build(), JobClient.getHTTPConfiguration());
+
+      LOG.log(Level.INFO, "Running on YARN");
+
+      final Configuration runtimeConfiguration = YarnClientConfiguration.CONF.build();
+
+      final REEF reef = Tang.Factory.getTang().newInjector(runtimeConfiguration).getInstance(REEFImplementation.class);
+      reef.submit(driverConfiguration);
+
+      LOG.info("Done!");
+    } catch (final BindException | InjectionException ex) {
+      LOG.log(Level.SEVERE, "Job configuration error", ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java
new file mode 100644
index 0000000..d93f6f4
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Generic java bridge driver/client
+ */
+package org.apache.reef.javabridge.generic;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java
new file mode 100644
index 0000000..46629c9
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.logging;
+
+import org.apache.reef.javabridge.NativeInterop;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.SimpleFormatter;
+
+/**
+ * Logging Handler to intercept java logs and transfer them
+ * to the CLR side via the reef-bridge.
+ * <p/>
+ * Logs are buffered to avoid the cost of reef-bridge function calls.
+ * A thread is also scheduled to flush the log buffer at a certain interval,
+ * in case the log buffer remains unfilled for an extended period of time.
+ */
+public class CLRBufferedLogHandler extends Handler {
+  private static final int BUFFER_LEN = 10;
+  private static final int NUM_THREADS = 1;
+  private static final long LOG_SCHEDULE_PERIOD = 15;  // seconds
+  private SimpleFormatter formatter;
+  private ArrayList<LogRecord> logs;
+  private boolean driverInitialized;
+  private ScheduledThreadPoolExecutor logScheduler;
+
+  @Inject
+  public CLRBufferedLogHandler() {
+    super();
+    this.formatter = new SimpleFormatter();
+    this.logs = new ArrayList<LogRecord>();
+    this.driverInitialized = false;
+    this.logScheduler = new ScheduledThreadPoolExecutor(NUM_THREADS);
+  }
+
+  /**
+   * Signals the java-bridge has been initialized and that we can begin logging.
+   * Usually called from the StartHandler after the driver is up.
+   */
+  public void setDriverInitialized() {
+    synchronized (this) {
+      this.driverInitialized = true;
+    }
+    startLogScheduler();
+  }
+
+  /**
+   * Called whenever a log message is received on the java side.
+   * <p/>
+   * Adds the log record to the log buffer. If the log buffer is full and
+   * the driver has already been initialized, flush the buffer of the logs.
+   */
+  @Override
+  public void publish(LogRecord record) {
+    if (record == null)
+      return;
+
+    if (!isLoggable(record))
+      return;
+
+    synchronized (this) {
+      this.logs.add(record);
+      if (!this.driverInitialized || this.logs.size() < BUFFER_LEN)
+        return;
+    }
+
+    logAll();
+  }
+
+  @Override
+  public void flush() {
+    logAll();
+  }
+
+  /**
+   * Flushes the remaining buffered logs and shuts down the log scheduler thread.
+   */
+  @Override
+  public synchronized void close() throws SecurityException {
+    if (driverInitialized) {
+      this.logAll();
+    }
+    this.logScheduler.shutdown();
+  }
+
+  /**
+   * Starts a thread to flush the log buffer on an interval.
+   * <p/>
+   * This will ensure that logs get flushed periodically, even
+   * if the log buffer is not full.
+   */
+  private void startLogScheduler() {
+    this.logScheduler.scheduleAtFixedRate(
+        new Runnable() {
+          @Override
+          public void run() {
+            CLRBufferedLogHandler.this.logAll();
+          }
+        }, 0, LOG_SCHEDULE_PERIOD, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Flushes the log buffer, logging each buffered log message using
+   * the reef-bridge log function.
+   */
+  private void logAll() {
+    synchronized (this) {
+      final StringBuilder sb = new StringBuilder();
+      Level highestLevel = Level.FINEST;
+      for (final LogRecord record : this.logs) {
+        sb.append(formatter.format(record));
+        sb.append("\n");
+        if (record.getLevel().intValue() > highestLevel.intValue()) {
+          highestLevel = record.getLevel();
+        }
+      }
+      try {
+        final int level = getLevel(highestLevel);
+        NativeInterop.ClrBufferedLog(level, sb.toString());
+      } catch (Exception e) {
+        System.err.println("Failed to perform CLRBufferedLogHandler");
+      }
+
+      this.logs.clear();
+    }
+  }
+
+  /**
+   * Returns the integer value of the log record's level to be used
+   * by the CLR Bridge log function.
+   */
+  private int getLevel(Level recordLevel) {
+    if (recordLevel.equals(Level.OFF)) {
+      return 0;
+    } else if (recordLevel.equals(Level.SEVERE)) {
+      return 1;
+    } else if (recordLevel.equals(Level.WARNING)) {
+      return 2;
+    } else if (recordLevel.equals(Level.ALL)) {
+      return 4;
+    } else {
+      return 3;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java
new file mode 100644
index 0000000..7d82937
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.logging;
+
+import java.io.IOException;
+import java.util.logging.LogManager;
+
+public final class CLRLoggingConfig {
+
+  public CLRLoggingConfig() throws IOException {
+    LogManager.getLogManager().readConfiguration(
+        Thread.currentThread().getContextClassLoader()
+            .getResourceAsStream("com/microsoft/reef/clr.logging.properties"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java
new file mode 100644
index 0000000..e0e79ce
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Logging handler for clr bridge
+ */
+package org.apache.reef.util.logging;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties b/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties
new file mode 100644
index 0000000..41c4024
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Properties file which configures the operation of the JDK
+# logging facility.
+
+# The system will look for this config file, first using
+# a System property specified at startup:
+#
+# >java -Djava.utils.logging.config.file=myLoggingConfigFilePath
+#
+# If this property is not specified, then the config file is
+# retrieved from its default location at:
+#
+# JDK_HOME/jre/lib/logging.properties
+
+# Global logging properties.
+# ------------------------------------------
+# The set of handlers to be loaded upon startup.
+# Comma-separated list of class names.
+# (? LogManager docs say no comma here, but JDK example has comma.)
+# handlers=java.utils.logging.FileHandler, java.utils.logging.ConsoleHandler
+handlers=java.util.logging.ConsoleHandler,org.apache.reef.util.logging.CLRBufferedLogHandler
+
+java.util.logging.SimpleFormatter.format=%1$tF %1$tT,%1$tL %4$s %2$s - %5$s%6$s%n
+
+# Default global logging level.
+# Loggers and Handlers may override this level
+.level=ALL
+
+# Loggers
+# ------------------------------------------
+# Loggers are usually attached to packages.
+# Here, the level for each package is specified.
+# The global level is used by default, so levels
+# specified here simply act as an override.
+
+# org.apache.reef.examples.level=FINEST
+# org.apache.reef.tang.level=INFO
+
+# Handlers
+# -----------------------------------------
+
+# --- ConsoleHandler ---
+# Override of global logging level
+java.util.logging.ConsoleHandler.level=FINEST
+java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
+
+# --- FileHandler ---
+# Override of global logging level
+java.util.logging.FileHandler.level=FINEST
+
+# Naming style for the output file:
+# (The output file is placed in the directory
+# defined by the "user.home" System property.)
+java.util.logging.FileHandler.pattern=%h/reef.%u.log
+
+# Limiting size of output file in bytes:
+java.util.logging.FileHandler.limit=512000
+
+# Number of output files to cycle through, by appending an
+# integer to the base file name:
+java.util.logging.FileHandler.count=100
+
+# Style of output (Simple or XML):
+java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge/pom.xml b/lang/java/reef-bridge-project/reef-bridge/pom.xml
new file mode 100644
index 0000000..774e5f8
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>reef-bridge</artifactId>
+    <name>REEF Bridge</name>
+    <description>Bridge between JVM and CLR.</description>
+
+
+    <parent>
+        <groupId>org.apache.reef</groupId>
+        <artifactId>reef-bridge-project</artifactId>
+        <version>0.11.0-incubating-SNAPSHOT</version>
+    </parent>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-runtime-local</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-runtime-yarn</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-io</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-checkpoint</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-bridge-java</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-bridge-clr</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>unpack-dependencies</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>unpack-dependencies</goal>
+                        </goals>
+
+                        <configuration>
+                            <includeArtifactIds>reef-bridge-java,reef-bridge-clr</includeArtifactIds>
+                            <outputDirectory>
+                                ${project.build.directory}/classes/ReefDriverAppDlls
+                            </outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <addClasspath>false</addClasspath>
+                            <classpathPrefix>lib/</classpathPrefix>
+                            <mainClass>org.apache.reef.javabridge.JavaBridge</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/maven-eclipse.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/maven-eclipse.xml b/lang/java/reef-checkpoint/maven-eclipse.xml
new file mode 100644
index 0000000..6c6b5ae
--- /dev/null
+++ b/lang/java/reef-checkpoint/maven-eclipse.xml
@@ -0,0 +1,28 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project default="copy-resources">
+  <target name="init"/>
+  <target name="copy-resources" depends="init">
+    <copy todir="target/classes/META-INF/conf" filtering="false">
+      <fileset dir="src/main/conf" includes="*.xml|*.properties" excludes="**/*.java"/>
+    </copy>
+  </target>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/pom.xml b/lang/java/reef-checkpoint/pom.xml
new file mode 100644
index 0000000..147a158
--- /dev/null
+++ b/lang/java/reef-checkpoint/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.reef</groupId>
+        <artifactId>reef-project</artifactId>
+        <version>0.11.0-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>reef-checkpoint</artifactId>
+    <name>REEF Checkpoint</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <test.build.webapps>${project.build.directory}/test-classes/webapps</test.build.webapps>
+    </properties>
+    <build>
+        <resources>
+            <resource>
+                <targetPath>META-INF/conf</targetPath>
+                <filtering>false</filtering>
+                <directory>${basedir}/src/main/conf</directory>
+                <includes>
+                    <include>*.xml</include>
+                    <include>*.properties</include>
+                    <include>webapps/**</include>
+                </includes>
+                <excludes>
+                </excludes>
+            </resource>
+        </resources>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>reef-annotations</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>tang</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
new file mode 100644
index 0000000..cd7cabe
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class represent the identified (memento) for a checkpoint. It is allowed
+ * to contain small amount of metadata about a checkpoint and must provide sufficient
+ * information to the corresponding CheckpointService to locate and retrieve the
+ * data contained in the checkpoint.
+ */
+public interface CheckpointID extends Writable {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java
new file mode 100644
index 0000000..4415c46
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+/**
+ * This class represent a naming service for checkpoints.
+ */
+public interface CheckpointNamingService {
+
+  /**
+   * Generate a new checkpoint Name
+   *
+   * @return the checkpoint name
+   */
+  public String getNewName();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
new file mode 100644
index 0000000..17af4ce
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * The CheckpointService provides a simple API to store and retrieve the state of a task.
+ * <p/>
+ * Checkpoints are atomic, single-writer, write-once, multiple-readers, ready-many type of objects.
+ * This is provided by releasing the CheckpointID for a checkpoint only upon commit of the checkpoint,
+ * and by preventing a checkpoint to be re-opened for writes.
+ * <p/>
+ * Non-functional properties such as durability, availability, compression, garbage collection,
+ * quotas are left to the implementation.
+ * <p/>
+ * This API is envisioned as the basic building block for a checkpoint service, on top of which richer
+ * interfaces can be layered (e.g., frameworks providing object-serialization, checkpoint metadata and
+ * provenance, etc.)
+ */
+public interface CheckpointService {
+
+  /**
+   * This method creates a checkpoint and provide a channel to write to it.
+   * The name/location of the checkpoint are unknown to the user as of this time, in fact,
+   * the CheckpointID is not released to the user until commit is called. This makes enforcing
+   * atomicity of writes easy.
+   *
+   * @return a CheckpointWriteChannel that can be used to write to the checkpoint
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  CheckpointWriteChannel create() throws IOException, InterruptedException;
+
+  /**
+   * Used to finalize and existing checkpoint. It returns the CheckpointID that can be later
+   * used to access (read-only) this checkpoint. This guarantees atomicity of the checkpoint.
+   *
+   * @param channel the CheckpointWriteChannel to commit
+   * @return a CheckpointID
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  CheckpointID commit(CheckpointWriteChannel channel) throws IOException, InterruptedException;
+
+  /**
+   * Dual to commit, it aborts the current checkpoint. Garbage collection choices are
+   * left to the implementation. The CheckpointID is not generated nor released to the
+   * user so the checkpoint is not accessible.
+   *
+   * @param channel the CheckpointWriteChannel to abort
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void abort(CheckpointWriteChannel channel) throws IOException, InterruptedException;
+
+  /**
+   * Given a CheckpointID returns a reading channel.
+   *
+   * @param checkpointId CheckpointID for the checkpoint to be opened
+   * @return a CheckpointReadChannel
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  CheckpointReadChannel open(CheckpointID checkpointId) throws IOException, InterruptedException;
+
+  /**
+   * It discards an existing checkpoint identified by its CheckpointID.
+   *
+   * @param checkpointId CheckpointID for the checkpoint to be deleted
+   * @return a boolean confirming success of the deletion
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  boolean delete(CheckpointID checkpointId) throws IOException, InterruptedException;
+
+  interface CheckpointWriteChannel extends WritableByteChannel {
+  }
+
+  interface CheckpointReadChannel extends ReadableByteChannel {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
new file mode 100644
index 0000000..c71a191
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Simple naming service that generates a random checkpoint name.
+ */
+public class RandomNameCNS implements CheckpointNamingService {
+
+  private final String prefix;
+
+  @Inject
+  public RandomNameCNS(@Parameter(PREFIX.class) final String prefix) {
+    this.prefix = prefix;
+  }
+
+  @Override
+  public String getNewName() {
+    return this.prefix + RandomStringUtils.randomAlphanumeric(8);
+  }
+
+  @NamedParameter(doc = "The prefix used for the random names returned.", default_value = "checkpoint_")
+  public static class PREFIX implements Name<String> {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
new file mode 100644
index 0000000..69e3cbd
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * A naming service that simply returns the name it has been initialized with.
+ */
+public class SimpleNamingService implements CheckpointNamingService {
+
+  private final String name;
+
+  @Inject
+  public SimpleNamingService(@Parameter(CheckpointName.class) final String name) {
+    this.name = "checkpoint_" + name;
+  }
+
+  /**
+   * Generate a new checkpoint Name.
+   *
+   * @return the checkpoint name
+   */
+  @Override
+  public String getNewName() {
+    return this.name;
+  }
+
+  /**
+   * Prefix for checkpoints.
+   */
+  @NamedParameter(doc = "Checkpoint prefix.", short_name = "checkpoint_prefix", default_value = "reef")
+  public static final class CheckpointName implements Name<String> {
+  }
+}