You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:19 UTC
[46/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
new file mode 100644
index 0000000..b2e0083
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -0,0 +1,724 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.driver.client.JobMessageObserver;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.javabridge.*;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.util.Optional;
+import org.apache.reef.util.logging.CLRBufferedLogHandler;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+import org.apache.reef.webserver.*;
+
+import javax.inject.Inject;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Generic job driver for CLRBridge.
+ */
+@Unit
+public final class JobDriver {
+
+ private static final Logger LOG = Logger.getLogger(JobDriver.class.getName());
+ /**
+ * String codec is used to encode the results
+ * before passing them back to the client.
+ */
+ private static final ObjectSerializableCodec<String> JVM_CODEC = new ObjectSerializableCodec<>();
+ private final InteropLogger interopLogger = new InteropLogger();
+ private final NameServer nameServer;
+ private final String nameServerInfo;
+ private final HttpServer httpServer;
+ /**
+ * Wake clock is used to schedule periodical job check-ups.
+ */
+ private final Clock clock;
+ /**
+ * Job observer on the client.
+ * We use it to send results from the driver back to the client.
+ */
+ private final JobMessageObserver jobMessageObserver;
+ /**
+ * Job driver uses EvaluatorRequestor
+ * to request Evaluators that will run the Tasks.
+ */
+ private final EvaluatorRequestor evaluatorRequestor;
+
+ /**
+ * Driver status manager to monitor driver status
+ */
+ private final DriverStatusManager driverStatusManager;
+
+ /**
+ * NativeInterop has function to load libs when driver starts
+ */
+ private final LibLoader libLoader;
+
+ /**
+ * Shell execution results from each Evaluator.
+ */
+ private final List<String> results = new ArrayList<>();
+ /**
+ * Map from context ID to running evaluator context.
+ */
+ private final Map<String, ActiveContext> contexts = new HashMap<>();
+
+ /**
+ * Logging scope factory that provides LoggingScope
+ */
+ private final LoggingScopeFactory loggingScopeFactory;
+
+ private long evaluatorRequestorHandler = 0;
+ private long allocatedEvaluatorHandler = 0;
+ private long activeContextHandler = 0;
+ private long taskMessageHandler = 0;
+ private long failedTaskHandler = 0;
+ private long failedEvaluatorHandler = 0;
+ private long httpServerEventHandler = 0;
+ private long completedTaskHandler = 0;
+ private long runningTaskHandler = 0;
+ private long suspendedTaskHandler = 0;
+ private long completedEvaluatorHandler = 0;
+ private long closedContextHandler = 0;
+ private long failedContextHandler = 0;
+ private long contextMessageHandler = 0;
+ private long driverRestartHandler = 0;
+ private long driverRestartActiveContextHandler = 0;
+ private long driverRestartRunningTaskHandler = 0;
+ private boolean clrBridgeSetup = false;
+ private boolean isRestarted = false;
+
+ /**
+ * Job driver constructor.
+ * All parameters are injected from TANG automatically.
+ *
+ * @param clock Wake clock to schedule and check up running jobs.
+ * @param jobMessageObserver is used to send messages back to the client.
+ * @param evaluatorRequestor is used to request Evaluators.
+ */
+ @Inject
+ JobDriver(final Clock clock,
+ final HttpServer httpServer,
+ final NameServer nameServer,
+ final JobMessageObserver jobMessageObserver,
+ final EvaluatorRequestor evaluatorRequestor,
+ final DriverStatusManager driverStatusManager,
+ final LoggingScopeFactory loggingScopeFactory,
+ final LibLoader libLoader) {
+ this.clock = clock;
+ this.httpServer = httpServer;
+ this.jobMessageObserver = jobMessageObserver;
+ this.evaluatorRequestor = evaluatorRequestor;
+ this.nameServer = nameServer;
+ this.driverStatusManager = driverStatusManager;
+ this.nameServerInfo = NetUtils.getLocalAddress() + ":" + this.nameServer.getPort();
+ this.loggingScopeFactory = loggingScopeFactory;
+ this.libLoader = libLoader;
+ }
+
+ private void setupBridge(final StartTime startTime) {
+ // Signal to the clr buffered log handler that the driver has started and that
+ // we can begin logging
+ LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
+ try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) {
+
+ try {
+ libLoader.loadLib();
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to load CLR libraries");
+ }
+
+ final CLRBufferedLogHandler handler = getCLRBufferedLogHandler();
+ if (handler == null) {
+ LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized");
+ } else {
+ handler.setDriverInitialized();
+ LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
+ }
+
+ LOG.log(Level.INFO, "StartTime: {0}", new Object[]{startTime});
+ String portNumber = httpServer == null ? null : Integer.toString((httpServer.getPort()));
+ long[] handlers = NativeInterop.CallClrSystemOnStartHandler(startTime.toString(), portNumber);
+ if (handlers != null) {
+ if (handlers.length != NativeInterop.nHandlers) {
+ throw new RuntimeException(
+ String.format("%s handlers initialized in CLR while native bridge is expecting %s handlers",
+ String.valueOf(handlers.length),
+ String.valueOf(NativeInterop.nHandlers)));
+ }
+ this.evaluatorRequestorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.EvaluatorRequestorKey)];
+ this.allocatedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.AllocatedEvaluatorKey)];
+ this.activeContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ActiveContextKey)];
+ this.taskMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.TaskMessageKey)];
+ this.failedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedTaskKey)];
+ this.failedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedEvaluatorKey)];
+ this.httpServerEventHandler = handlers[NativeInterop.Handlers.get(NativeInterop.HttpServerKey)];
+ this.completedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedTaskKey)];
+ this.runningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.RunningTaskKey)];
+ this.suspendedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.SuspendedTaskKey)];
+ this.completedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedEvaluatorKey)];
+ this.closedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ClosedContextKey)];
+ this.failedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedContextKey)];
+ this.contextMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ContextMessageKey)];
+ this.driverRestartHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartKey)];
+ this.driverRestartActiveContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartActiveContextKey)];
+ this.driverRestartRunningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartRunningTaskKey)];
+ }
+
+ try (final LoggingScope lp = this.loggingScopeFactory.getNewLoggingScope("setupBridge::ClrSystemHttpServerHandlerOnNext")) {
+ final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC");
+ NativeInterop.ClrSystemHttpServerHandlerOnNext(this.httpServerEventHandler, httpServerEventBridge, this.interopLogger);
+ final String specList = httpServerEventBridge.getUriSpecification();
+ LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList);
+ if (specList != null) {
+ final String[] specs = specList.split(":");
+ for (final String s : specs) {
+ final HttpHandler h = new HttpServerBridgeEventHandler();
+ h.setUriSpecification(s);
+ this.httpServer.addHttpHandler(h);
+ }
+ }
+ }
+ this.clrBridgeSetup = true;
+ }
+ LOG.log(Level.INFO, "CLR Bridge setup.");
+ }
+
+ private CLRBufferedLogHandler getCLRBufferedLogHandler() {
+ for (Handler handler : Logger.getLogger("").getHandlers()) {
+ if (handler instanceof CLRBufferedLogHandler)
+ return (CLRBufferedLogHandler) handler;
+ }
+ return null;
+ }
+
+ private void submitEvaluator(final AllocatedEvaluator eval, EvaluatorType type) {
+ synchronized (JobDriver.this) {
+ eval.setType(type);
+ LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}",
+ new Object[]{eval.getId(), JobDriver.this.contexts.size()});
+ if (JobDriver.this.allocatedEvaluatorHandler == 0) {
+ throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR.");
+ }
+ AllocatedEvaluatorBridge allocatedEvaluatorBridge = new AllocatedEvaluatorBridge(eval, JobDriver.this.nameServerInfo);
+ NativeInterop.ClrSystemAllocatedEvaluatorHandlerOnNext(JobDriver.this.allocatedEvaluatorHandler, allocatedEvaluatorBridge, this.interopLogger);
+ }
+ }
+
+ /**
+ * Submit a Task to a single Evaluator.
+ */
+ private void submit(final ActiveContext context) {
+ try {
+ LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context});
+ if (JobDriver.this.activeContextHandler == 0) {
+ throw new RuntimeException("Active Context Handler not initialized by CLR.");
+ }
+ ActiveContextBridge activeContextBridge = new ActiveContextBridge(context);
+ NativeInterop.ClrSystemActiveContextHandlerOnNext(JobDriver.this.activeContextHandler, activeContextBridge, JobDriver.this.interopLogger);
+ } catch (final Exception ex) {
+ LOG.log(Level.SEVERE, "Fail to submit task to active context");
+ context.close();
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Handles AllocatedEvaluator: Submit an empty context
+ */
+ final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
+ JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.CLR);
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that a new Context is available.
+ */
+ final class ActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext context) {
+ try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}",
+ new Object[]{context.getId()});
+ JobDriver.this.contexts.put(context.getId(), context);
+ JobDriver.this.submit(context);
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Task has completed successfully.
+ */
+ final class CompletedTaskHandler implements EventHandler<CompletedTask> {
+ @Override
+ public void onNext(final CompletedTask task) {
+ LOG.log(Level.INFO, "Completed task: {0}", task.getId());
+ try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) {
+ // Take the message returned by the task and add it to the running result.
+ String result = "default result";
+ try {
+ result = new String(task.get());
+ } catch (final Exception e) {
+ LOG.log(Level.WARNING, "failed to decode task outcome");
+ }
+ LOG.log(Level.INFO, "Return results to the client:\n{0}", result);
+ JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result));
+ if (JobDriver.this.completedTaskHandler == 0) {
+ LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
+ } else {
+ LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
+ CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task);
+ NativeInterop.ClrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge, JobDriver.this.interopLogger);
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the entire Evaluator had failed.
+ */
+ final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
+ @Override
+ public void onNext(final FailedEvaluator eval) {
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+ for (final FailedContext failedContext : eval.getFailedContextList()) {
+ String failedContextId = failedContext.getId();
+ LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
+ JobDriver.this.contexts.remove(failedContextId);
+ }
+ String message = "Evaluator " + eval.getId() + " failed with message: "
+ + eval.getEvaluatorException().getMessage();
+ JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+
+ if (failedEvaluatorHandler == 0) {
+ if (JobDriver.this.clrBridgeSetup) {
+ message = "No CLR FailedEvaluator handler was set, exiting now";
+ LOG.log(Level.WARNING, message);
+ JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+ return;
+ } else {
+ clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm time) {
+ if (JobDriver.this.clrBridgeSetup) {
+ handleFailedEvaluatorInCLR(eval);
+ } else {
+ LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
+ clock.scheduleAlarm(5000, this);
+ }
+ }
+ });
+ }
+ } else {
+ handleFailedEvaluatorInCLR(eval);
+ }
+ }
+ }
+ }
+
+ private void handleFailedEvaluatorInCLR(final FailedEvaluator eval) {
+ final String message = "CLR FailedEvaluator handler set, handling things with CLR handler.";
+ LOG.log(Level.INFO, message);
+ FailedEvaluatorBridge failedEvaluatorBridge = new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, JobDriver.this.isRestarted, loggingScopeFactory);
+ NativeInterop.ClrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler, failedEvaluatorBridge, JobDriver.this.interopLogger);
+ int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
+ if (additionalRequestedEvaluatorNumber > 0) {
+ LOG.log(Level.INFO, "number of additional evaluators requested after evaluator failure: " + additionalRequestedEvaluatorNumber);
+ }
+ JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+ }
+ }
+
+ final class HttpServerBridgeEventHandler implements HttpHandler {
+ private String uriSpecification;
+
+ /**
+ * returns URI specification for the handler
+ */
+ @Override
+ public String getUriSpecification() {
+ return uriSpecification;
+ }
+
+ public void setUriSpecification(String s) {
+ uriSpecification = s;
+ }
+
+ /**
+ * process http request
+ */
+ @Override
+ public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) throws IOException, ServletException {
+ LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri());
+ try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) {
+ final AvroHttpSerializer httpSerializer = new AvroHttpSerializer();
+ final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest);
+ final byte[] requestBytes = httpSerializer.toBytes(avroHttpRequest);
+
+ try {
+ final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes);
+ NativeInterop.ClrSystemHttpServerHandlerOnNext(JobDriver.this.httpServerEventHandler, httpServerEventBridge, JobDriver.this.interopLogger);
+ final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8");
+ response.getWriter().println(responseBody);
+ LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody);
+ } catch (final Exception ex) {
+ LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
+ /**
+ * Handle failed task.
+ */
+ final class FailedTaskHandler implements EventHandler<FailedTask> {
+ @Override
+ public void onNext(final FailedTask task) throws RuntimeException {
+ LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set.");
+ if (JobDriver.this.failedTaskHandler == 0) {
+ LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real.");
+ throw new RuntimeException("Failed Task Handler not initialized by CLR.");
+ }
+ try {
+ FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task);
+ NativeInterop.ClrSystemFailedTaskHandlerOnNext(JobDriver.this.failedTaskHandler, failedTaskBridge, JobDriver.this.interopLogger);
+ } catch (final Exception ex) {
+ LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler");
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Task is running.
+ */
+ final class RunningTaskHandler implements EventHandler<RunningTask> {
+ @Override
+ public void onNext(final RunningTask task) {
+ try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) {
+ if (JobDriver.this.runningTaskHandler == 0) {
+ LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler.");
+ } else {
+ LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
+ try {
+ final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task);
+ NativeInterop.ClrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge, JobDriver.this.interopLogger);
+ } catch (final Exception ex) {
+ LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Task is running when driver restarted.
+ */
+ final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> {
+ @Override
+ public void onNext(final RunningTask task) {
+ try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) {
+ clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm time) {
+ if (JobDriver.this.clrBridgeSetup) {
+ if (JobDriver.this.driverRestartRunningTaskHandler != 0) {
+ LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
+ NativeInterop.ClrSystemDriverRestartRunningTaskHandlerOnNext(JobDriver.this.driverRestartRunningTaskHandler, new RunningTaskBridge(task));
+ } else {
+ LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, done with DriverRestartRunningTaskHandler.");
+ }
+ } else {
+ LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart RunningTaskHandler...");
+ clock.scheduleAlarm(2000, this);
+ }
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Receive notification that an context is active on Evaluator when the driver restarted
+ */
+ final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> {
+ @Override
+ public void onNext(final ActiveContext context) {
+ try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) {
+ JobDriver.this.contexts.put(context.getId(), context);
+ LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId());
+ clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm time) {
+ if (JobDriver.this.clrBridgeSetup) {
+ if (JobDriver.this.driverRestartActiveContextHandler != 0) {
+ LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
+ NativeInterop.ClrSystemDriverRestartActiveContextHandlerOnNext(JobDriver.this.driverRestartActiveContextHandler, new ActiveContextBridge(context));
+ } else {
+ LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, done with DriverRestartActiveContextHandler.");
+ }
+ } else {
+ LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart DriverRestartActiveContextHandler...");
+ clock.scheduleAlarm(2000, this);
+ }
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Job Driver is ready and the clock is set up: request the evaluators.
+ */
+ final class StartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) {
+ synchronized (JobDriver.this) {
+
+ setupBridge(startTime);
+
+ LOG.log(Level.INFO, "Driver Started");
+
+ if (JobDriver.this.evaluatorRequestorHandler == 0) {
+ throw new RuntimeException("Evaluator Requestor Handler not initialized by CLR.");
+ }
+ EvaluatorRequestorBridge evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory);
+ NativeInterop.ClrSystemEvaluatorRequstorHandlerOnNext(JobDriver.this.evaluatorRequestorHandler, evaluatorRequestorBridge, JobDriver.this.interopLogger);
+ // get the evaluator numbers set by CLR handler
+ LOG.log(Level.INFO, "evaluator requested at start up: " + evaluatorRequestorBridge.getEvaluatorNumber());
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Job driver is restarted after previous crash
+ */
+ final class RestartHandler implements EventHandler<StartTime> {
+ @Override
+ public void onNext(final StartTime startTime) {
+ try (final LoggingScope ls = loggingScopeFactory.driverRestart(startTime)) {
+ synchronized (JobDriver.this) {
+
+ setupBridge(startTime);
+
+ JobDriver.this.isRestarted = true;
+
+ LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that driver restart has completed.
+ */
+ final class DriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> {
+ @Override
+ public void onNext(final DriverRestartCompleted driverRestartCompleted) {
+ LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", driverRestartCompleted.getTimeStamp());
+ try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(driverRestartCompleted.getTimeStamp())) {
+ if (JobDriver.this.driverRestartHandler != 0) {
+ LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
+ NativeInterop.ClrSystemDriverRestartHandlerOnNext(JobDriver.this.driverRestartHandler);
+ } else {
+ LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler.");
+
+ }
+ }
+ }
+ }
+
+ /**
+ * Shutting down the job driver: close the evaluators.
+ */
+ final class StopHandler implements EventHandler<StopTime> {
+ @Override
+ public void onNext(final StopTime time) {
+ LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time});
+ try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) {
+ for (final ActiveContext context : contexts.values()) {
+ context.close();
+ }
+ }
+ }
+ }
+
+ final class TaskMessageHandler implements EventHandler<TaskMessage> {
+ @Override
+ public void onNext(final TaskMessage taskMessage) {
+ String msg = new String(taskMessage.get());
+ LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg);
+ //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) {
+ if (JobDriver.this.taskMessageHandler != 0) {
+ TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage);
+ // if CLR implements the task message handler, handle the bytes in CLR handler
+ NativeInterop.ClrSystemTaskMessageHandlerOnNext(JobDriver.this.taskMessageHandler, taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger);
+ }
+ //}
+ }
+ }
+
+ /**
+ * Receive notification that the Task has been suspended.
+ */
+ final class SuspendedTaskHandler implements EventHandler<SuspendedTask> {
+ @Override
+ public final void onNext(final SuspendedTask task) {
+ final String message = "Received notification that task [" + task.getId() + "] has been suspended.";
+ LOG.log(Level.INFO, message);
+ try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
+ if (JobDriver.this.suspendedTaskHandler != 0) {
+ SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task);
+ // if CLR implements the suspended task handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
+ NativeInterop.ClrSystemSupendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge);
+ }
+ JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message));
+ }
+ }
+ }
+
+ /**
+ * Receive notification that the Evaluator has been shut down.
+ */
+ final class CompletedEvaluatorHandler implements EventHandler<CompletedEvaluator> {
+ @Override
+ public void onNext(final CompletedEvaluator evaluator) {
+ LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId());
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) {
+ if (JobDriver.this.completedEvaluatorHandler != 0) {
+ CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator);
+ // if CLR implements the completed evaluator handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge.");
+ NativeInterop.ClrSystemCompletdEvaluatorHandlerOnNext(completedEvaluatorHandler, completedEvaluatorBridge);
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Receive notification that the Context had completed.
+ * Remove context from the list of active context.
+ */
+ final class ClosedContextHandler implements EventHandler<ClosedContext> {
+ @Override
+ public void onNext(final ClosedContext context) {
+ LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
+ try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
+ if (JobDriver.this.closedContextHandler != 0) {
+ ClosedContextBridge closedContextBridge = new ClosedContextBridge(context);
+ // if CLR implements the closed context handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
+ NativeInterop.ClrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge);
+ }
+ synchronized (JobDriver.this) {
+ JobDriver.this.contexts.remove(context.getId());
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Receive notification that the Context had failed.
+ * Remove context from the list of active context and notify the client.
+ */
+ final class FailedContextHandler implements EventHandler<FailedContext> {
+ @Override
+ public void onNext(final FailedContext context) {
+ LOG.log(Level.SEVERE, "FailedContext", context);
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
+ if (JobDriver.this.failedContextHandler != 0) {
+ FailedContextBridge failedContextBridge = new FailedContextBridge(context);
+ // if CLR implements the failed context handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
+ NativeInterop.ClrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge);
+ }
+ synchronized (JobDriver.this) {
+ JobDriver.this.contexts.remove(context.getId());
+ }
+ Optional<byte[]> err = context.getData();
+ if (err.isPresent()) {
+ JobDriver.this.jobMessageObserver.sendMessageToClient(err.get());
+ }
+ }
+ }
+ }
+
+ /**
+ * Receive notification that a ContextMessage has been received
+ */
+ final class ContextMessageHandler implements EventHandler<ContextMessage> {
+ @Override
+ public void onNext(final ContextMessage message) {
+ LOG.log(Level.SEVERE, "Received ContextMessage:", message.get());
+ try (final LoggingScope ls = loggingScopeFactory.contextMessageReceived(message.get().toString())) {
+ if (JobDriver.this.contextMessageHandler != 0) {
+ ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message);
+ // if CLR implements the context message handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of context message in CLR bridge.");
+ NativeInterop.ClrSystemContextMessageHandlerOnNext(JobDriver.this.contextMessageHandler, contextMessageBridge);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
new file mode 100644
index 0000000..b1473ee
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.client.ClientConfiguration;
+import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.util.logging.LoggingScopeImpl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Clr Bridge example - main class.
+ */
+public final class Launch {
+
+ /**
+ * Number of REEF worker threads in local mode. We assume maximum 10 evaluators can be requested on local runtime
+ */
+ private static final int NUM_LOCAL_THREADS = 10;
+ /**
+ * Standard Java logger
+ */
+ private static final Logger LOG = Logger.getLogger(Launch.class.getName());
+
+ /**
+ * This class should not be instantiated.
+ */
+ private Launch() {
+ throw new RuntimeException("Do not instantiate this class!");
+ }
+
+ /**
+ * Parse the command line arguments.
+ *
+ * @param args command line arguments, as passed to main()
+ * @return Configuration object.
+ * @throws org.apache.reef.tang.exceptions.BindException configuration error.
+ * @throws java.io.IOException error reading the configuration.
+ */
+ private static Configuration parseCommandLine(final String[] args)
+ throws BindException, IOException {
+ final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+ final CommandLine cl = new CommandLine(confBuilder);
+ cl.registerShortNameOfClass(Local.class);
+ cl.registerShortNameOfClass(NumRuns.class);
+ cl.registerShortNameOfClass(WaitTimeForDriver.class);
+ cl.registerShortNameOfClass(DriverMemoryInMb.class);
+ cl.registerShortNameOfClass(DriverIdentifier.class);
+ cl.registerShortNameOfClass(DriverJobSubmissionDirectory.class);
+ cl.registerShortNameOfClass(Submit.class);
+ cl.processCommandLine(args);
+ return confBuilder.build();
+ }
+
+ private static Configuration cloneCommandLineConfiguration(final Configuration commandLineConf)
+ throws InjectionException, BindException {
+ final Injector injector = Tang.Factory.getTang().newInjector(commandLineConf);
+ final JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder();
+ cb.bindNamedParameter(NumRuns.class, String.valueOf(injector.getNamedInstance(NumRuns.class)));
+ return cb.build();
+ }
+
+ /**
+ * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+ *
+ * @param args Command line arguments, as passed into main().
+ * @return (immutable) TANG Configuration object.
+ * @throws org.apache.reef.tang.exceptions.BindException if configuration commandLineInjector fails.
+ * @throws org.apache.reef.tang.exceptions.InjectionException if configuration commandLineInjector fails.
+ * @throws java.io.IOException error reading the configuration.
+ */
+ private static Configuration getClientConfiguration(final String[] args)
+ throws BindException, InjectionException, IOException {
+
+ try (final LoggingScope ls = LoggingScopeFactory.getNewLoggingScope(Level.INFO, "Launch::getClientConfiguration")) {
+ final Configuration commandLineConf = parseCommandLine(args);
+
+ final Configuration clientConfiguration = ClientConfiguration.CONF
+ .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
+ .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
+ //.set(ClientConfiguration.ON_WAKE_ERROR, JobClient.WakeErrorHandler.class )
+ .build();
+
+ // TODO: Remove the injector, have stuff injected.
+ final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
+ final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
+ final Configuration runtimeConfiguration;
+ if (isLocal) {
+ LOG.log(Level.INFO, "Running on the local runtime");
+ runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+ .build();
+ } else {
+ LOG.log(Level.INFO, "Running on YARN");
+ runtimeConfiguration = YarnClientConfiguration.CONF.build();
+ }
+
+ return Tang.Factory.getTang()
+ .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
+ cloneCommandLineConfiguration(commandLineConf))
+ .build();
+ }
+ }
+
+ /**
+ * Main method that starts the CLR Bridge from Java
+ *
+ * @param args command line parameters.
+ */
+ public static void main(final String[] args) {
+ LOG.log(Level.INFO, "Entering Launch at :::" + new Date());
+ try {
+ if (args == null || args.length == 0) {
+ throw new IllegalArgumentException("No arguments provided, at least a clrFolder should be supplied.");
+ }
+ final File dotNetFolder = new File(args[0]).getAbsoluteFile();
+ String[] removedArgs = Arrays.copyOfRange(args, 1, args.length);
+
+ final Configuration config = getClientConfiguration(removedArgs);
+ final Injector commandLineInjector = Tang.Factory.getTang().newInjector(parseCommandLine(removedArgs));
+ final int waitTime = commandLineInjector.getNamedInstance(WaitTimeForDriver.class);
+ final int driverMemory = commandLineInjector.getNamedInstance(DriverMemoryInMb.class);
+ final String driverIdentifier = commandLineInjector.getNamedInstance(DriverIdentifier.class);
+ final String jobSubmissionDirectory = commandLineInjector.getNamedInstance(DriverJobSubmissionDirectory.class);
+ final boolean submit = commandLineInjector.getNamedInstance(Submit.class);
+ final Injector injector = Tang.Factory.getTang().newInjector(config);
+ final JobClient client = injector.getInstance(JobClient.class);
+ client.setDriverInfo(driverIdentifier, driverMemory, jobSubmissionDirectory);
+
+ if (submit) {
+ client.submit(dotNetFolder, true, null);
+ client.waitForCompletion(waitTime);
+ } else {
+ client.submit(dotNetFolder, false, config);
+ client.waitForCompletion(0);
+ }
+
+
+ LOG.info("Done!");
+ } catch (final BindException | InjectionException | IOException ex) {
+ LOG.log(Level.SEVERE, "Job configuration error", ex);
+ }
+ }
+
+ /**
+ * Command line parameter: number of experiments to run.
+ */
+ @NamedParameter(doc = "Number of times to run the command",
+ short_name = "num_runs", default_value = "1")
+ public static final class NumRuns implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter = true to run locally, or false to run on YARN.
+ */
+ @NamedParameter(doc = "Whether or not to run on the local runtime",
+ short_name = "local", default_value = "true")
+ public static final class Local implements Name<Boolean> {
+ }
+
+ /**
+ * Command line parameter, number of seconds to wait till driver finishes ,
+ * = -1 : waits forever
+ * = 0: exit immediately without wait for driver.
+ */
+ @NamedParameter(doc = "Whether or not to wait for driver to finish",
+ short_name = "wait_time", default_value = "-1")
+ public static final class WaitTimeForDriver implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter, driver memory, in MB
+ */
+ @NamedParameter(doc = "memory allocated to driver JVM",
+ short_name = "driver_memory", default_value = "512")
+ public static final class DriverMemoryInMb implements Name<Integer> {
+ }
+
+ /**
+ * Command line parameter, driver identifier
+ */
+ @NamedParameter(doc = "driver identifier for clr bridge",
+ short_name = "driver_id", default_value = "ReefClrBridge")
+ public static final class DriverIdentifier implements Name<String> {
+ }
+
+ /**
+ * Command line parameter = true to submit the job with driver config, or false to write config to current directory
+ */
+ @NamedParameter(doc = "Whether or not to submit the reef job after driver config is constructed",
+ short_name = "submit", default_value = "true")
+ public static final class Submit implements Name<Boolean> {
+ }
+
+ /**
+ * Command line parameter, job submission directory, if set, user should guarantee its uniqueness
+ */
+ @NamedParameter(doc = "driver job submission directory",
+ short_name = "submission_directory", default_value = "empty")
+ public static final class DriverJobSubmissionDirectory implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java
new file mode 100644
index 0000000..ba2a5cb
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/LaunchHeadless.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge.generic;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.REEF;
+import org.apache.reef.runtime.common.client.REEFImplementation;
+import org.apache.reef.runtime.yarn.client.YarnClientConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationModule;
+
+import java.io.File;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Clr Bridge example - main class.
+ */
+public final class LaunchHeadless {
+
+ /**
+ * Standard Java logger
+ */
+ private static final Logger LOG = Logger.getLogger(LaunchHeadless.class.getName());
+
+ /**
+ * This class should not be instantiated.
+ */
+ private LaunchHeadless() {
+ throw new RuntimeException("Do not instantiate this class!");
+ }
+
+
+ /**
+ * Parse command line arguments and create TANG configuration ready to be submitted to REEF.
+ *
+ * @param args Command line arguments, as passed into main().
+ * @return (immutable) TANG Configuration object.
+ * @throws org.apache.reef.tang.exceptions.BindException if configuration commandLineInjector fails.
+ * @throws org.apache.reef.tang.exceptions.InjectionException if configuration commandLineInjector fails.
+ * @throws java.io.IOException error reading the configuration.
+ */
+
+ /**
+ * Main method that starts the CLR Bridge from Java
+ *
+ * @param args command line parameters.
+ */
+ public static void main(final String[] args) {
+ try {
+ if (args == null || args.length == 0) {
+ throw new IllegalArgumentException("No arguments provided, at least a clrFolder should be supplied.");
+ }
+ final File dotNetFolder = new File(args[0]).getAbsoluteFile();
+
+ ConfigurationModule driverConfigModule = JobClient.getDriverConfiguration();
+
+ ConfigurationModule result = driverConfigModule;
+ for (final File f : dotNetFolder.listFiles()) {
+ if (f.canRead() && f.exists() && f.isFile()) {
+ result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath());
+ }
+ }
+
+ driverConfigModule = result;
+ Configuration driverConfiguration = Configurations.merge(driverConfigModule.build(), JobClient.getHTTPConfiguration());
+
+ LOG.log(Level.INFO, "Running on YARN");
+
+ final Configuration runtimeConfiguration = YarnClientConfiguration.CONF.build();
+
+ final REEF reef = Tang.Factory.getTang().newInjector(runtimeConfiguration).getInstance(REEFImplementation.class);
+ reef.submit(driverConfiguration);
+
+ LOG.info("Done!");
+ } catch (final BindException | InjectionException ex) {
+ LOG.log(Level.SEVERE, "Job configuration error", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java
new file mode 100644
index 0000000..d93f6f4
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Generic java bridge driver/client
+ */
+package org.apache.reef.javabridge.generic;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java
new file mode 100644
index 0000000..46629c9
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRBufferedLogHandler.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.logging;
+
+import org.apache.reef.javabridge.NativeInterop;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.SimpleFormatter;
+
+/**
+ * Logging Handler to intercept java logs and transfer them
+ * to the CLR side via the reef-bridge.
+ * <p/>
+ * Logs are buffered to avoid the cost of reef-bridge function calls.
+ * A thread is also scheduled to flush the log buffer at a certain interval,
+ * in case the log buffer remains unfilled for an extended period of time.
+ */
+public class CLRBufferedLogHandler extends Handler {
+ private static final int BUFFER_LEN = 10;
+ private static final int NUM_THREADS = 1;
+ private static final long LOG_SCHEDULE_PERIOD = 15; // seconds
+ private SimpleFormatter formatter;
+ private ArrayList<LogRecord> logs;
+ private boolean driverInitialized;
+ private ScheduledThreadPoolExecutor logScheduler;
+
+ @Inject
+ public CLRBufferedLogHandler() {
+ super();
+ this.formatter = new SimpleFormatter();
+ this.logs = new ArrayList<LogRecord>();
+ this.driverInitialized = false;
+ this.logScheduler = new ScheduledThreadPoolExecutor(NUM_THREADS);
+ }
+
+ /**
+ * Signals the java-bridge has been initialized and that we can begin logging.
+ * Usually called from the StartHandler after the driver is up.
+ */
+ public void setDriverInitialized() {
+ synchronized (this) {
+ this.driverInitialized = true;
+ }
+ startLogScheduler();
+ }
+
+ /**
+ * Called whenever a log message is received on the java side.
+ * <p/>
+ * Adds the log record to the log buffer. If the log buffer is full and
+ * the driver has already been initialized, flush the buffer of the logs.
+ */
+ @Override
+ public void publish(LogRecord record) {
+ if (record == null)
+ return;
+
+ if (!isLoggable(record))
+ return;
+
+ synchronized (this) {
+ this.logs.add(record);
+ if (!this.driverInitialized || this.logs.size() < BUFFER_LEN)
+ return;
+ }
+
+ logAll();
+ }
+
+ @Override
+ public void flush() {
+ logAll();
+ }
+
+ /**
+ * Flushes the remaining buffered logs and shuts down the log scheduler thread.
+ */
+ @Override
+ public synchronized void close() throws SecurityException {
+ if (driverInitialized) {
+ this.logAll();
+ }
+ this.logScheduler.shutdown();
+ }
+
+ /**
+ * Starts a thread to flush the log buffer on an interval.
+ * <p/>
+ * This will ensure that logs get flushed periodically, even
+ * if the log buffer is not full.
+ */
+ private void startLogScheduler() {
+ this.logScheduler.scheduleAtFixedRate(
+ new Runnable() {
+ @Override
+ public void run() {
+ CLRBufferedLogHandler.this.logAll();
+ }
+ }, 0, LOG_SCHEDULE_PERIOD, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Flushes the log buffer, logging each buffered log message using
+ * the reef-bridge log function.
+ */
+ private void logAll() {
+ synchronized (this) {
+ final StringBuilder sb = new StringBuilder();
+ Level highestLevel = Level.FINEST;
+ for (final LogRecord record : this.logs) {
+ sb.append(formatter.format(record));
+ sb.append("\n");
+ if (record.getLevel().intValue() > highestLevel.intValue()) {
+ highestLevel = record.getLevel();
+ }
+ }
+ try {
+ final int level = getLevel(highestLevel);
+ NativeInterop.ClrBufferedLog(level, sb.toString());
+ } catch (Exception e) {
+ System.err.println("Failed to perform CLRBufferedLogHandler");
+ }
+
+ this.logs.clear();
+ }
+ }
+
+ /**
+ * Returns the integer value of the log record's level to be used
+ * by the CLR Bridge log function.
+ */
+ private int getLevel(Level recordLevel) {
+ if (recordLevel.equals(Level.OFF)) {
+ return 0;
+ } else if (recordLevel.equals(Level.SEVERE)) {
+ return 1;
+ } else if (recordLevel.equals(Level.WARNING)) {
+ return 2;
+ } else if (recordLevel.equals(Level.ALL)) {
+ return 4;
+ } else {
+ return 3;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java
new file mode 100644
index 0000000..7d82937
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/CLRLoggingConfig.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util.logging;
+
+import java.io.IOException;
+import java.util.logging.LogManager;
+
+public final class CLRLoggingConfig {
+
+ public CLRLoggingConfig() throws IOException {
+ LogManager.getLogManager().readConfiguration(
+ Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream("com/microsoft/reef/clr.logging.properties"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java
new file mode 100644
index 0000000..e0e79ce
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/util/logging/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Logging handler for clr bridge
+ */
+package org.apache.reef.util.logging;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties b/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties
new file mode 100644
index 0000000..41c4024
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge-java/src/main/resources/org/apache/reef/clr.logging.properties
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Properties file which configures the operation of the JDK
+# logging facility.
+
+# The system will look for this config file, first using
+# a System property specified at startup:
+#
+# >java -Djava.utils.logging.config.file=myLoggingConfigFilePath
+#
+# If this property is not specified, then the config file is
+# retrieved from its default location at:
+#
+# JDK_HOME/jre/lib/logging.properties
+
+# Global logging properties.
+# ------------------------------------------
+# The set of handlers to be loaded upon startup.
+# Comma-separated list of class names.
+# (? LogManager docs say no comma here, but JDK example has comma.)
+# handlers=java.utils.logging.FileHandler, java.utils.logging.ConsoleHandler
+handlers=java.util.logging.ConsoleHandler,org.apache.reef.util.logging.CLRBufferedLogHandler
+
+java.util.logging.SimpleFormatter.format=%1$tF %1$tT,%1$tL %4$s %2$s - %5$s%6$s%n
+
+# Default global logging level.
+# Loggers and Handlers may override this level
+.level=ALL
+
+# Loggers
+# ------------------------------------------
+# Loggers are usually attached to packages.
+# Here, the level for each package is specified.
+# The global level is used by default, so levels
+# specified here simply act as an override.
+
+# org.apache.reef.examples.level=FINEST
+# org.apache.reef.tang.level=INFO
+
+# Handlers
+# -----------------------------------------
+
+# --- ConsoleHandler ---
+# Override of global logging level
+java.util.logging.ConsoleHandler.level=FINEST
+java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
+
+# --- FileHandler ---
+# Override of global logging level
+java.util.logging.FileHandler.level=FINEST
+
+# Naming style for the output file:
+# (The output file is placed in the directory
+# defined by the "user.home" System property.)
+java.util.logging.FileHandler.pattern=%h/reef.%u.log
+
+# Limiting size of output file in bytes:
+java.util.logging.FileHandler.limit=512000
+
+# Number of output files to cycle through, by appending an
+# integer to the base file name:
+java.util.logging.FileHandler.count=100
+
+# Style of output (Simple or XML):
+java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-bridge-project/reef-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-project/reef-bridge/pom.xml b/lang/java/reef-bridge-project/reef-bridge/pom.xml
new file mode 100644
index 0000000..774e5f8
--- /dev/null
+++ b/lang/java/reef-bridge-project/reef-bridge/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>reef-bridge</artifactId>
+ <name>REEF Bridge</name>
+ <description>Bridge between JVM and CLR.</description>
+
+
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-bridge-project</artifactId>
+ <version>0.11.0-incubating-SNAPSHOT</version>
+ </parent>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-local</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-runtime-yarn</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-io</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-checkpoint</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-bridge-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-bridge-clr</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>unpack-dependencies</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>unpack-dependencies</goal>
+ </goals>
+
+ <configuration>
+ <includeArtifactIds>reef-bridge-java,reef-bridge-clr</includeArtifactIds>
+ <outputDirectory>
+ ${project.build.directory}/classes/ReefDriverAppDlls
+ </outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <addClasspath>false</addClasspath>
+ <classpathPrefix>lib/</classpathPrefix>
+ <mainClass>org.apache.reef.javabridge.JavaBridge</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/maven-eclipse.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/maven-eclipse.xml b/lang/java/reef-checkpoint/maven-eclipse.xml
new file mode 100644
index 0000000..6c6b5ae
--- /dev/null
+++ b/lang/java/reef-checkpoint/maven-eclipse.xml
@@ -0,0 +1,28 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project default="copy-resources">
+ <target name="init"/>
+ <target name="copy-resources" depends="init">
+ <copy todir="target/classes/META-INF/conf" filtering="false">
+ <fileset dir="src/main/conf" includes="*.xml|*.properties" excludes="**/*.java"/>
+ </copy>
+ </target>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/pom.xml b/lang/java/reef-checkpoint/pom.xml
new file mode 100644
index 0000000..147a158
--- /dev/null
+++ b/lang/java/reef-checkpoint/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.reef</groupId>
+ <artifactId>reef-project</artifactId>
+ <version>0.11.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>reef-checkpoint</artifactId>
+ <name>REEF Checkpoint</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <test.build.webapps>${project.build.directory}/test-classes/webapps</test.build.webapps>
+ </properties>
+ <build>
+ <resources>
+ <resource>
+ <targetPath>META-INF/conf</targetPath>
+ <filtering>false</filtering>
+ <directory>${basedir}/src/main/conf</directory>
+ <includes>
+ <include>*.xml</include>
+ <include>*.properties</include>
+ <include>webapps/**</include>
+ </includes>
+ <excludes>
+ </excludes>
+ </resource>
+ </resources>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>reef-annotations</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tang</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
new file mode 100644
index 0000000..cd7cabe
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class represent the identified (memento) for a checkpoint. It is allowed
+ * to contain small amount of metadata about a checkpoint and must provide sufficient
+ * information to the corresponding CheckpointService to locate and retrieve the
+ * data contained in the checkpoint.
+ */
+public interface CheckpointID extends Writable {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java
new file mode 100644
index 0000000..4415c46
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+/**
+ * This class represent a naming service for checkpoints.
+ */
+public interface CheckpointNamingService {
+
+ /**
+ * Generate a new checkpoint Name
+ *
+ * @return the checkpoint name
+ */
+ public String getNewName();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
new file mode 100644
index 0000000..17af4ce
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * The CheckpointService provides a simple API to store and retrieve the state of a task.
+ * <p/>
+ * Checkpoints are atomic, single-writer, write-once, multiple-readers, ready-many type of objects.
+ * This is provided by releasing the CheckpointID for a checkpoint only upon commit of the checkpoint,
+ * and by preventing a checkpoint to be re-opened for writes.
+ * <p/>
+ * Non-functional properties such as durability, availability, compression, garbage collection,
+ * quotas are left to the implementation.
+ * <p/>
+ * This API is envisioned as the basic building block for a checkpoint service, on top of which richer
+ * interfaces can be layered (e.g., frameworks providing object-serialization, checkpoint metadata and
+ * provenance, etc.)
+ */
+public interface CheckpointService {
+
+ /**
+ * This method creates a checkpoint and provide a channel to write to it.
+ * The name/location of the checkpoint are unknown to the user as of this time, in fact,
+ * the CheckpointID is not released to the user until commit is called. This makes enforcing
+ * atomicity of writes easy.
+ *
+ * @return a CheckpointWriteChannel that can be used to write to the checkpoint
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ CheckpointWriteChannel create() throws IOException, InterruptedException;
+
+ /**
+ * Used to finalize and existing checkpoint. It returns the CheckpointID that can be later
+ * used to access (read-only) this checkpoint. This guarantees atomicity of the checkpoint.
+ *
+ * @param channel the CheckpointWriteChannel to commit
+ * @return a CheckpointID
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ CheckpointID commit(CheckpointWriteChannel channel) throws IOException, InterruptedException;
+
+ /**
+ * Dual to commit, it aborts the current checkpoint. Garbage collection choices are
+ * left to the implementation. The CheckpointID is not generated nor released to the
+ * user so the checkpoint is not accessible.
+ *
+ * @param channel the CheckpointWriteChannel to abort
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void abort(CheckpointWriteChannel channel) throws IOException, InterruptedException;
+
+ /**
+ * Given a CheckpointID returns a reading channel.
+ *
+ * @param checkpointId CheckpointID for the checkpoint to be opened
+ * @return a CheckpointReadChannel
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ CheckpointReadChannel open(CheckpointID checkpointId) throws IOException, InterruptedException;
+
+ /**
+ * It discards an existing checkpoint identified by its CheckpointID.
+ *
+ * @param checkpointId CheckpointID for the checkpoint to be deleted
+ * @return a boolean confirming success of the deletion
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ boolean delete(CheckpointID checkpointId) throws IOException, InterruptedException;
+
+ interface CheckpointWriteChannel extends WritableByteChannel {
+ }
+
+ interface CheckpointReadChannel extends ReadableByteChannel {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
new file mode 100644
index 0000000..c71a191
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Simple naming service that generates a random checkpoint name.
+ */
+public class RandomNameCNS implements CheckpointNamingService {
+
+ private final String prefix;
+
+ @Inject
+ public RandomNameCNS(@Parameter(PREFIX.class) final String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public String getNewName() {
+ return this.prefix + RandomStringUtils.randomAlphanumeric(8);
+ }
+
+ @NamedParameter(doc = "The prefix used for the random names returned.", default_value = "checkpoint_")
+ public static class PREFIX implements Name<String> {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
new file mode 100644
index 0000000..69e3cbd
--- /dev/null
+++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * A naming service that simply returns the name it has been initialized with.
+ */
+public class SimpleNamingService implements CheckpointNamingService {
+
+ private final String name;
+
+ @Inject
+ public SimpleNamingService(@Parameter(CheckpointName.class) final String name) {
+ this.name = "checkpoint_" + name;
+ }
+
+ /**
+ * Generate a new checkpoint Name.
+ *
+ * @return the checkpoint name
+ */
+ @Override
+ public String getNewName() {
+ return this.name;
+ }
+
+ /**
+ * Prefix for checkpoints.
+ */
+ @NamedParameter(doc = "Checkpoint prefix.", short_name = "checkpoint_prefix", default_value = "reef")
+ public static final class CheckpointName implements Name<String> {
+ }
+}