You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by mo...@apache.org on 2018/03/28 19:05:20 UTC

[3/4] reef git commit: [REEF-1965] Implement REEF runtime for Azure Batch

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java
new file mode 100644
index 0000000..96f247a
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import com.microsoft.azure.batch.protocol.models.CloudTask;
+import com.microsoft.azure.batch.protocol.models.TaskState;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.proto.EvaluatorShimProtocol;
+import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.azbatch.util.storage.AzureStorageClient;
+import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
+import org.apache.reef.runtime.azbatch.util.RemoteIdentifierParser;
+import org.apache.reef.runtime.azbatch.util.TaskStatusMapper;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
+import org.apache.reef.runtime.common.files.*;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+import org.apache.reef.wake.remote.impl.SocketRemoteIdentifier;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.reef.runtime.azbatch.driver.RuntimeIdentifier.RUNTIME_NAME;
+
+/**
+ * The Driver's view of evaluator shims running in the cluster. This class serves the following purposes:
+ *    1. listens for evaluator shim status messages signaling that the shim is online and ready to start the evaluator
+ *    process.
+ *    2. listens for {@link ResourceLaunchEvent} events and sends commands to the evaluator shims to start the
+ *    evaluator process.
+ *    3. listens for {@link ResourceReleaseEvent} events and sends terminate commands to the evaluator shims.
+ *    4. triggers {@link org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent}
+ *       events to update REEF Common on runtime status.
+ *    5. triggers {@link org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent}
+ *       events to update REEF Common on container statuses.
+ */
+@Private
+@DriverSide
+public final class AzureBatchEvaluatorShimManager
+    implements EventHandler<RemoteMessage<EvaluatorShimProtocol.EvaluatorShimStatusProto>> {
+
+  private static final Logger LOG = Logger.getLogger(AzureBatchEvaluatorShimManager.class.getName());
+
+  private static final int EVALUATOR_SHIM_MEMORY_MB = 64;
+
+  private final Map<String, ResourceRequestEvent> outstandingResourceRequests;
+  private final AtomicInteger outstandingResourceRequestCount;
+
+  private final Map<String, CloudTask> failedResources;
+
+  private final AutoCloseable evaluatorShimCommandChannel;
+
+  private final AzureStorageClient azureStorageClient;
+  private final REEFFileNames reefFileNames;
+  private final AzureBatchFileNames azureBatchFileNames;
+  private final RemoteManager remoteManager;
+  private final AzureBatchHelper azureBatchHelper;
+  private final AzureBatchEvaluatorShimConfigurationProvider evaluatorShimConfigurationProvider;
+  private final JobJarMaker jobJarMaker;
+  private final CommandBuilder launchCommandBuilder;
+  private final REEFEventHandlers reefEventHandlers;
+  private final ConfigurationSerializer configurationSerializer;
+
+  private final Evaluators evaluators;
+
+  @Inject
+  AzureBatchEvaluatorShimManager(
+      final AzureStorageClient azureStorageClient,
+      final REEFFileNames reefFileNames,
+      final AzureBatchFileNames azureBatchFileNames,
+      final RemoteManager remoteManager,
+      final REEFEventHandlers reefEventHandlers,
+      final Evaluators evaluators,
+      final CommandBuilder launchCommandBuilder,
+      final AzureBatchHelper azureBatchHelper,
+      final JobJarMaker jobJarMaker,
+      final AzureBatchEvaluatorShimConfigurationProvider evaluatorShimConfigurationProvider,
+      final ConfigurationSerializer configurationSerializer) {
+    this.azureStorageClient = azureStorageClient;
+    this.reefFileNames = reefFileNames;
+    this.azureBatchFileNames = azureBatchFileNames;
+    this.remoteManager = remoteManager;
+
+    this.reefEventHandlers = reefEventHandlers;
+    this.evaluators = evaluators;
+
+    this.launchCommandBuilder = launchCommandBuilder;
+
+    this.azureBatchHelper = azureBatchHelper;
+    this.jobJarMaker = jobJarMaker;
+
+    this.evaluatorShimConfigurationProvider = evaluatorShimConfigurationProvider;
+
+    this.outstandingResourceRequests = new ConcurrentHashMap<>();
+    this.outstandingResourceRequestCount = new AtomicInteger();
+
+    this.failedResources = new ConcurrentHashMap<>();
+
+    this.evaluatorShimCommandChannel = remoteManager
+        .registerHandler(EvaluatorShimProtocol.EvaluatorShimStatusProto.class, this);
+
+    this.configurationSerializer = configurationSerializer;
+  }
+
+  /**
+   * This method is called when a resource is requested. It will add a task to the existing Azure Batch job which
+   * is equivalent to requesting a container in Azure Batch. When the request is fulfilled and the evaluator shim is
+   * started, it will send a message back to the driver which signals that a resource request was fulfilled.
+   *
+   * @param resourceRequestEvent resource request event.
+   * @param containerId container id for the resource. It will be used as the task id for Azure Batch task.
+   * @param jarFileUri Azure Storage SAS URI of the JAR file containing libraries required by the evaluator shim.
+   */
+  public void onResourceRequested(final ResourceRequestEvent resourceRequestEvent,
+                                  final String containerId,
+                                  final URI jarFileUri) {
+    try {
+      createAzureBatchTask(containerId, jarFileUri);
+      this.outstandingResourceRequests.put(containerId, resourceRequestEvent);
+      this.outstandingResourceRequestCount.incrementAndGet();
+      this.updateRuntimeStatus();
+    } catch (IOException e) {
+      LOG.log(Level.SEVERE, "Failed to create Azure Batch task with the following exception: {0}", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * This method is invoked by the RemoteManager when a message from the evaluator shim is received.
+   *
+   * @param statusMessage the message from the evaluator shim indicating that the shim has started and is ready to
+   *                      start the evaluator process.
+   */
+  @Override
+  public void onNext(final RemoteMessage<EvaluatorShimProtocol.EvaluatorShimStatusProto> statusMessage) {
+
+    EvaluatorShimProtocol.EvaluatorShimStatusProto message = statusMessage.getMessage();
+    String containerId = message.getContainerId();
+    String remoteId = message.getRemoteIdentifier();
+
+    LOG.log(Level.INFO, "Got a status message from evaluator shim = {0} with containerId = {1} and status = {2}.",
+        new String[]{remoteId, containerId, message.getStatus().toString()});
+
+    if (message.getStatus() != EvaluatorShimProtocol.EvaluatorShimStatus.ONLINE) {
+      LOG.log(Level.SEVERE, "Unexpected status returned from the evaluator shim: {0}. Ignoring the message.",
+          message.getStatus().toString());
+      return;
+    }
+
+    this.onResourceAllocated(containerId, remoteId, Optional.<CloudTask>empty());
+  }
+
+  /**
+   * This method is invoked when the Azure Batch runtime is notified that a pending resource request has been
+   * fulfilled. It could happen because of two reasons:
+   *    1. The driver receives a message from the evaluator shim indicating it has successfully started.
+   *    2. {@link AzureBatchTaskStatusAlarmHandler} detects that the evaluator shim failed before sending the status
+   *       message.
+   *
+   * @param containerId id of the container.
+   * @param remoteId remote address for the allocated container.
+   * @param cloudTask Azure Batch task which corresponds to the container.
+   */
+  public void onResourceAllocated(final String containerId,
+                                  final String remoteId,
+                                  final Optional<CloudTask> cloudTask) {
+    ResourceRequestEvent resourceRequestEvent = this.outstandingResourceRequests.remove(containerId);
+
+    if (resourceRequestEvent == null) {
+      LOG.log(Level.WARNING, "No outstanding resource request found for container id = {0}.", containerId);
+    } else {
+      this.outstandingResourceRequestCount.decrementAndGet();
+
+      // We would expect the Azure Batch task to be in 'RUNNING' state. If it is in
+      // 'COMPLETED' state, it cannot receiver instructions and thus by definition
+      // has failed.
+      if (cloudTask.isPresent() && TaskState.COMPLETED.equals(cloudTask.get().state())) {
+        this.failedResources.put(containerId, cloudTask.get());
+      }
+
+      LOG.log(Level.FINEST, "Notifying REEF of a new node: {0}", remoteId);
+      this.reefEventHandlers.onNodeDescriptor(NodeDescriptorEventImpl.newBuilder()
+          .setIdentifier(RemoteIdentifierParser.parseNodeId(remoteId))
+          .setHostName(RemoteIdentifierParser.parseIp(remoteId))
+          .setPort(RemoteIdentifierParser.parsePort(remoteId))
+          .setMemorySize(resourceRequestEvent.getMemorySize().get())
+          .build());
+
+      LOG.log(Level.FINEST, "Triggering a new ResourceAllocationEvent for remoteId = {0}.", remoteId);
+      this.reefEventHandlers.onResourceAllocation(
+          ResourceEventImpl.newAllocationBuilder()
+              .setIdentifier(containerId)
+              .setNodeId(RemoteIdentifierParser.parseNodeId(remoteId))
+              .setResourceMemory(resourceRequestEvent.getMemorySize().get())
+              .setVirtualCores(resourceRequestEvent.getVirtualCores().get())
+              .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME)
+              .build());
+    }
+
+    this.updateRuntimeStatus();
+  }
+
+  /**
+   * Event handler method for {@link ResourceLaunchEvent}. This method will determine if the evaluator shim
+   * is online and send the evaluator launch command to the shim to start the evaluator.
+   *
+   * @param resourceLaunchEvent an instance of {@ResourceLaunchEvent}
+   * @param command OS command to launch the evaluator process.
+   * @param evaluatorConfigurationString evaluator configuration serialized as a String.
+   */
+  public void onResourceLaunched(final ResourceLaunchEvent resourceLaunchEvent,
+                                 final String command,
+                                 final String evaluatorConfigurationString) {
+    final String resourceId = resourceLaunchEvent.getIdentifier();
+
+    if (this.failedResources.containsKey(resourceId)) {
+      LOG.log(Level.FINE, "ResourceLaunch event triggered on a failed container. " +
+          "Notifying REEF of failed container.");
+      CloudTask cloudTask = this.failedResources.get(resourceId);
+      this.onAzureBatchTaskStatus(cloudTask);
+    } else if (this.evaluators.get(resourceId).isPresent()) {
+      LOG.log(Level.FINE, "Preparing to launch resourceId = {0}", resourceId);
+      this.launchEvaluator(resourceLaunchEvent, command, evaluatorConfigurationString);
+    } else {
+      LOG.log(Level.WARNING, "Received a ResourceLaunch event for an unknown resourceId = {0}", resourceId);
+    }
+
+    this.updateRuntimeStatus();
+  }
+
+  /**
+   * Event handler method for {@link ResourceReleaseEvent}. Sends a TERMINATE command to the appropriate evaluator shim.
+   *
+   * @param resourceReleaseEvent
+   */
+  public void onResourceReleased(final ResourceReleaseEvent resourceReleaseEvent) {
+    String resourceRemoteId = getResourceRemoteId(resourceReleaseEvent.getIdentifier());
+
+    // REEF Common will trigger a ResourceReleaseEvent even if the resource has failed. Since we know that the shim
+    // has already failed, we can safely ignore this.
+    if (this.failedResources.remove(resourceReleaseEvent.getIdentifier()) != null) {
+      LOG.log(Level.INFO, "Received a ResourceReleaseEvent for a failed shim with resourceId = {0}. Ignoring.",
+          resourceReleaseEvent.getIdentifier());
+    } else if (this.evaluators.get(resourceReleaseEvent.getIdentifier()).isPresent()) {
+      EventHandler<EvaluatorShimProtocol.EvaluatorShimControlProto> handler =
+          this.remoteManager.getHandler(resourceRemoteId, EvaluatorShimProtocol.EvaluatorShimControlProto.class);
+
+      LOG.log(Level.INFO, "Sending TERMINATE command to the shim with remoteId = {0}.", resourceRemoteId);
+      handler.onNext(
+          EvaluatorShimProtocol.EvaluatorShimControlProto
+              .newBuilder()
+              .setCommand(EvaluatorShimProtocol.EvaluatorShimCommand.TERMINATE)
+              .build());
+
+      this.updateRuntimeStatus();
+    }
+  }
+
+  /**
+   * Takes in an instance of {@link CloudTask}, generates and triggers a
+   * {@link org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent}.
+   *
+   * @param cloudTask and instance of {@link CloudTask}.
+   */
+  public void onAzureBatchTaskStatus(final CloudTask cloudTask) {
+    ResourceStatusEventImpl.Builder eventBuilder =
+        ResourceStatusEventImpl.newBuilder()
+            .setIdentifier(cloudTask.id())
+            .setState(TaskStatusMapper.getReefTaskState(cloudTask))
+            .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME);
+
+    if (TaskState.COMPLETED.equals(cloudTask.state())) {
+      eventBuilder.setExitCode(cloudTask.executionInfo().exitCode());
+    }
+
+    this.reefEventHandlers.onResourceStatus(eventBuilder.build());
+  }
+
+  /**
+   * Closes the evaluator shim remote manager command channel.
+   */
+  public void onClose() {
+    try {
+      this.evaluatorShimCommandChannel.close();
+    } catch (Exception e) {
+      LOG.log(Level.WARNING, "An unexpected exception while closing the Evaluator Shim Command channel: {0}", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * A utility method which builds the evaluator shim JAR file and uploads it to Azure Storage.
+   *
+   * @return SAS URI to where the evaluator shim JAR was uploaded.
+   */
+  public URI generateShimJarFile() {
+
+    try {
+      Set<FileResource> globalFiles = new HashSet<>();
+
+      final File globalFolder = new File(this.reefFileNames.getGlobalFolderPath());
+      final File[] filesInGlobalFolder = globalFolder.listFiles();
+
+      for (final File fileEntry : filesInGlobalFolder != null ? filesInGlobalFolder : new File[]{}) {
+        globalFiles.add(getFileResourceFromFile(fileEntry, FileType.LIB));
+      }
+
+      File jarFile = this.jobJarMaker.newBuilder()
+          .addGlobalFileSet(globalFiles)
+          .build();
+
+      return uploadFile(jarFile);
+    } catch (IOException ex) {
+      LOG.log(Level.SEVERE, "Failed to build JAR file", ex);
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private void updateRuntimeStatus() {
+    this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder()
+        .setName(RUNTIME_NAME)
+        .setState(State.RUNNING)
+        .setOutstandingContainerRequests(this.outstandingResourceRequestCount.get())
+        .build());
+  }
+
+  private void launchEvaluator(final ResourceLaunchEvent resourceLaunchEvent,
+                               final String command,
+                               final String evaluatorConfigurationString) {
+    String resourceId = resourceLaunchEvent.getIdentifier();
+    String resourceRemoteId = getResourceRemoteId(resourceId);
+
+    Set<FileResource> fileResources = resourceLaunchEvent.getFileSet();
+    String fileUrl = "";
+    if (!fileResources.isEmpty()) {
+      try {
+        File jarFile = writeFileResourcesJarFile(fileResources);
+        fileUrl = uploadFile(jarFile).toString();
+        LOG.log(Level.FINE, "Uploaded evaluator file resources to Azure Storage at {0}.", fileUrl);
+      } catch (IOException e) {
+        LOG.log(Level.SEVERE, "Failed to generate zip archive for evaluator file resources: {0}.", e);
+        throw new RuntimeException(e);
+      }
+    } else {
+      LOG.log(Level.INFO, "No file resources found in ResourceLaunchEvent.");
+    }
+
+    LOG.log(Level.INFO, "Sending a command to the Evaluator shim with remoteId = {0} to start the evaluator.",
+        resourceRemoteId);
+    EventHandler<EvaluatorShimProtocol.EvaluatorShimControlProto> handler = this.remoteManager
+        .getHandler(resourceRemoteId, EvaluatorShimProtocol.EvaluatorShimControlProto.class);
+
+    handler.onNext(
+        EvaluatorShimProtocol.EvaluatorShimControlProto
+            .newBuilder()
+            .setCommand(EvaluatorShimProtocol.EvaluatorShimCommand.LAUNCH_EVALUATOR)
+            .setEvaluatorLaunchCommand(command)
+            .setEvaluatorConfigString(evaluatorConfigurationString)
+            .setEvaluatorFileResourcesUrl(fileUrl)
+            .build());
+  }
+
+  private String getEvaluatorShimLaunchCommand() {
+    return this.launchCommandBuilder.buildEvaluatorShimCommand(EVALUATOR_SHIM_MEMORY_MB,
+        this.azureBatchFileNames.getEvaluatorShimConfigurationPath());
+  }
+
+  /**
+   * @return The name under which the evaluator shim configuration will be stored in
+   * REEF_BASE_FOLDER/LOCAL_FOLDER.
+   */
+  private FileResource getFileResourceFromFile(final File configFile, final FileType type) {
+    return FileResourceImpl.newBuilder()
+        .setName(configFile.getName())
+        .setPath(configFile.getPath())
+        .setType(type).build();
+  }
+
+  private void createAzureBatchTask(final String taskId, final URI jarFileUri) throws IOException {
+    final Configuration shimConfig = this.evaluatorShimConfigurationProvider.getConfiguration(taskId);
+    final File shim = new File(this.reefFileNames.getLocalFolderPath(),
+        taskId + '-' + this.azureBatchFileNames.getEvaluatorShimConfigurationName());
+    this.configurationSerializer.toFile(shimConfig, shim);
+    final URI shimUri = this.uploadFile(shim);
+    this.azureBatchHelper.submitTask(this.azureBatchHelper.getAzureBatchJobId(), taskId, jarFileUri,
+        shimUri, getEvaluatorShimLaunchCommand());
+  }
+
+  private File writeFileResourcesJarFile(final Set<FileResource> fileResourceSet) throws IOException {
+    return this.jobJarMaker.newBuilder().addLocalFileSet(fileResourceSet).build();
+  }
+
+  private URI uploadFile(final File jarFile) throws IOException {
+    final String folderName = this.azureBatchFileNames.getStorageJobFolder(this.azureBatchHelper.getAzureBatchJobId());
+    LOG.log(Level.FINE, "Uploading {0} to {1}.", new Object[]{jarFile.getAbsolutePath(), folderName});
+
+    return this.azureStorageClient.uploadFile(folderName, jarFile);
+  }
+
+  private String getResourceRemoteId(final String resourceId) {
+    Optional<EvaluatorManager> optionalEvaluatorManager = this.evaluators.get(resourceId);
+
+    if (!optionalEvaluatorManager.isPresent()) {
+      LOG.log(Level.SEVERE, "Unknown evaluator with resourceId = {0}", resourceId);
+      throw new RuntimeException("Unknown evaluator with resourceId = " + resourceId);
+    }
+
+    NodeDescriptor nodeDescriptor = optionalEvaluatorManager.get().getEvaluatorDescriptor().getNodeDescriptor();
+    return (new SocketRemoteIdentifier(nodeDescriptor.getInetSocketAddress())).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java
new file mode 100644
index 0000000..b642c4d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceLaunchHandler} for Azure Batch.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceLaunchHandler implements ResourceLaunchHandler {
+
+  private static final Logger LOG = Logger.getLogger(AzureBatchResourceLaunchHandler.class.getName());
+  private final AzureBatchResourceManager azureBatchResourceManager;
+
+  @Inject
+  AzureBatchResourceLaunchHandler(final AzureBatchResourceManager azureBatchResourceManager) {
+    this.azureBatchResourceManager = azureBatchResourceManager;
+  }
+
+  /**
+   * This method is called when a new resource is requested.
+   *
+   * @param resourceLaunchEvent resource launch event.
+   */
+  @Override
+  public void onNext(final ResourceLaunchEvent resourceLaunchEvent) {
+    LOG.log(Level.FINEST, "Got ResourceLaunchEvent in AzureBatchResourceLaunchHandler");
+    this.azureBatchResourceManager.onResourceLaunched(resourceLaunchEvent);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java
new file mode 100644
index 0000000..8fbbf17
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The Driver's view of all resources in Azure Batch pool.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceManager {
+  private static final Logger LOG = Logger.getLogger(AzureBatchResourceManager.class.getName());
+
+  private final Map<String, ResourceRequestEvent> containerRequests;
+  private final AtomicInteger containerCount;
+
+  private final ConfigurationSerializer configurationSerializer;
+  private final CommandBuilder launchCommandBuilder;
+  private final AzureBatchEvaluatorShimManager evaluatorShimManager;
+  private final AzureBatchTaskStatusAlarmHandler azureBatchTaskStatusAlarmHandler;
+
+  private final double jvmHeapFactor;
+
+  @Inject
+  AzureBatchResourceManager(
+      final ConfigurationSerializer configurationSerializer,
+      final CommandBuilder launchCommandBuilder,
+      final AzureBatchEvaluatorShimManager evaluatorShimManager,
+      final AzureBatchTaskStatusAlarmHandler azureBatchTaskStatusAlarmHandler,
+      @Parameter(JVMHeapSlack.class) final double jvmHeapSlack) {
+    this.configurationSerializer = configurationSerializer;
+    this.evaluatorShimManager = evaluatorShimManager;
+    this.jvmHeapFactor = 1.0 - jvmHeapSlack;
+    this.launchCommandBuilder = launchCommandBuilder;
+    this.containerRequests = new ConcurrentHashMap<>();
+    this.containerCount = new AtomicInteger(0);
+    this.azureBatchTaskStatusAlarmHandler = azureBatchTaskStatusAlarmHandler;
+  }
+
+  /**
+   * This method is invoked when a {@link ResourceRequestEvent} is triggered.
+   *
+   * @param resourceRequestEvent the resource request event.
+   */
+  public void onResourceRequested(final ResourceRequestEvent resourceRequestEvent) {
+    LOG.log(Level.FINEST, "Got ResourceRequestEvent in AzureBatchResourceManager,");
+    URI jarFileUri = this.evaluatorShimManager.generateShimJarFile();
+    for (int r = 0; r < resourceRequestEvent.getResourceCount(); r++) {
+      final String containerId = generateContainerId();
+      LOG.log(Level.FINE, "containerId in AzureBatchResourceManager {0}", containerId);
+      this.containerRequests.put(containerId, resourceRequestEvent);
+      this.containerCount.incrementAndGet();
+      this.evaluatorShimManager.onResourceRequested(resourceRequestEvent, containerId, jarFileUri);
+    }
+
+    int currentContainerCount = this.containerCount.get();
+    if (currentContainerCount > 0) {
+      this.azureBatchTaskStatusAlarmHandler.enableAlarm();
+    }
+  }
+
+  /**
+   * This method is invoked when a {@link ResourceReleaseEvent} is triggered.
+   *
+   * @param resourceReleaseEvent the resource release event.
+   */
+  public void onResourceReleased(final ResourceReleaseEvent resourceReleaseEvent) {
+    String id = resourceReleaseEvent.getIdentifier();
+    LOG.log(Level.FINEST, "Got ResourceReleasedEvent for Id: {0} in AzureBatchResourceManager", id);
+
+    ResourceRequestEvent removedEvent = this.containerRequests.remove(id);
+    if (removedEvent == null) {
+      LOG.log(Level.WARNING,
+          "Ignoring attempt to remove non-existent containerRequest for Id: {0} in AzureBatchResourceManager", id);
+    } else {
+      int currentContainerCount = this.containerCount.decrementAndGet();
+      if (currentContainerCount <= 0) {
+        this.azureBatchTaskStatusAlarmHandler.disableAlarm();
+      }
+    }
+
+    this.evaluatorShimManager.onResourceReleased(resourceReleaseEvent);
+  }
+
+  /**
+   * This method is called when the {@link ResourceLaunchEvent} is triggered.
+   *
+   * @param resourceLaunchEvent the resource launch event.
+   */
+  public void onResourceLaunched(final ResourceLaunchEvent resourceLaunchEvent) {
+    String id = resourceLaunchEvent.getIdentifier();
+    LOG.log(Level.FINEST, "Got ResourceLaunchEvent for Id: {0} in AzureBatchResourceManager", id);
+    final int evaluatorMemory = this.containerRequests.get(id).getMemorySize().get();
+    String launchCommand = this.launchCommandBuilder.buildEvaluatorCommand(resourceLaunchEvent,
+        evaluatorMemory, this.jvmHeapFactor);
+    String evaluatorConfigurationString = this.configurationSerializer.toString(resourceLaunchEvent.getEvaluatorConf());
+    this.evaluatorShimManager.onResourceLaunched(resourceLaunchEvent, launchCommand, evaluatorConfigurationString);
+  }
+
+  private String generateContainerId() {
+    return UUID.randomUUID().toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java
new file mode 100644
index 0000000..e07e91d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStartHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceManagerStartHandler} for Azure Batch runtime.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceManagerStartHandler implements ResourceManagerStartHandler {
+
+  private static final Logger LOG = Logger.getLogger(AzureBatchResourceManagerStartHandler.class.getName());
+
+  @Inject
+  AzureBatchResourceManagerStartHandler() {
+  }
+
+  @Override
+  public void onNext(final RuntimeStart runtimeStart) {
+    LOG.log(Level.FINE, "Azure batch runtime has been started...");
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java
new file mode 100644
index 0000000..93ef918
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceManagerStopHandler} for Azure Batch runtime.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceManagerStopHandler implements ResourceManagerStopHandler {
+
+  private static final Logger LOG = Logger.getLogger(AzureBatchResourceManagerStopHandler.class.getName());
+
+  private final AzureBatchEvaluatorShimManager azureBatchEvaluatorShimManager;
+
+  @Inject
+  AzureBatchResourceManagerStopHandler(final AzureBatchEvaluatorShimManager azureBatchEvaluatorShimManager) {
+    this.azureBatchEvaluatorShimManager = azureBatchEvaluatorShimManager;
+  }
+
+  @Override
+  public void onNext(final RuntimeStop runtimeStop) {
+    LOG.log(Level.FINE, "Azure batch runtime has been stopped...");
+    this.azureBatchEvaluatorShimManager.onClose();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java
new file mode 100644
index 0000000..e3da49b
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceReleaseHandler} for Azure Batch runtime.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceReleaseHandler implements ResourceReleaseHandler {
+
+  private static final Logger LOG = Logger.getLogger(AzureBatchResourceReleaseHandler.class.getName());
+
+  private final AzureBatchResourceManager azureBatchResourceManager;
+
+  @Inject
+  AzureBatchResourceReleaseHandler(final AzureBatchResourceManager azureBatchResourceManager) {
+    this.azureBatchResourceManager = azureBatchResourceManager;
+  }
+
+  @Override
+  public void onNext(final ResourceReleaseEvent resourceReleaseEvent) {
+    LOG.log(Level.FINEST, "Got ResourceReleaseEvent in AzureBatchResourceLaunchHandler");
+    this.azureBatchResourceManager.onResourceReleased(resourceReleaseEvent);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java
new file mode 100644
index 0000000..fb9c000
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A {@link ResourceRequestHandler} for Azure Batch runtime.
+ */
+@Private
+@DriverSide
+public final class AzureBatchResourceRequestHandler implements ResourceRequestHandler {
+
+  private static final Logger LOG = Logger.getLogger(AzureBatchResourceRequestHandler.class.getName());
+  private final AzureBatchResourceManager azureBatchResourceManager;
+
+  @Inject
+  AzureBatchResourceRequestHandler(
+      final AzureBatchResourceManager azureBatchResourceManager) {
+    this.azureBatchResourceManager = azureBatchResourceManager;
+  }
+
+  @Override
+  public void onNext(final ResourceRequestEvent resourceRequestEvent) {
+    LOG.log(Level.FINEST, "Got ResourceRequestEvent in AzureBatchResourceRequestHandler: memory = {0}, cores = {1}.",
+        new Object[]{resourceRequestEvent.getMemorySize(), resourceRequestEvent.getVirtualCores()});
+    this.azureBatchResourceManager.onResourceRequested(resourceRequestEvent);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java
new file mode 100644
index 0000000..0afe07d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import com.microsoft.azure.batch.protocol.models.CloudTask;
+import com.microsoft.azure.batch.protocol.models.TaskState;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.AzureBatchTaskStatusCheckPeriod;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+
+import javax.inject.Inject;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Class that gets that status of the tasks from Azure Batch for the job that is currently in progress
+ * and notifies REEF of the status.
+ * Unlike YARN, Azure Batch does not support Resource Manager callbacks. Therefore, we must periodically call
+ * Azure Batch APIs to determine the status of tasks running inside our job.
+ */
+@Private
+@DriverSide
+final class AzureBatchTaskStatusAlarmHandler implements EventHandler<Alarm> {
+
+  /**
+   * A placeholder remote ID which is used for reporting failed containers when the
+   * failure occurs before Wake communication channel can be established and the real
+   * remote ID is unknown.
+   */
+  private static final String PLACEHOLDER_REMOTE_ID = "socket://0.0.0.0:0";
+
+  private final InjectionFuture<AzureBatchEvaluatorShimManager> evaluatorShimManager;
+  private final AzureBatchHelper azureBatchHelper;
+  private final int taskStatusCheckPeriod;
+  private boolean isAlarmEnabled;
+  private final Evaluators evaluators;
+  private final Clock clock;
+
+  private static final Logger LOG = Logger.getLogger(AzureBatchTaskStatusAlarmHandler.class.getName());
+
+  @Inject
+  private AzureBatchTaskStatusAlarmHandler(
+      final InjectionFuture<AzureBatchEvaluatorShimManager> evaluatorShimManager,
+      final AzureBatchHelper azureBatchHelper,
+      final Evaluators evaluators,
+      final Clock clock,
+      @Parameter(AzureBatchTaskStatusCheckPeriod.class) final int taskStatusCheckPeriod) {
+    this.evaluatorShimManager = evaluatorShimManager;
+    this.azureBatchHelper = azureBatchHelper;
+    this.evaluators = evaluators;
+    this.clock = clock;
+    this.taskStatusCheckPeriod = taskStatusCheckPeriod;
+  }
+
+  /**
+   * This method is periodically invoked by the Runtime Clock. It will call Azure Batch APIs to determine
+   * the status of tasks running inside the job and notify REEF of tasks statuses that correspond to running
+   * evaluators.
+   *
+   * @param alarm alarm object.
+   */
+  @Override
+  public void onNext(final Alarm alarm) {
+    String jobId = this.azureBatchHelper.getAzureBatchJobId();
+    List<CloudTask> allTasks = this.azureBatchHelper.getTaskStatusForJob(jobId);
+
+    // Report status if the task has an associated active container.
+    LOG.log(Level.FINER, "Found {0} tasks from job id {1}", new Object[]{allTasks.size(), jobId});
+    for (CloudTask task : allTasks) {
+      Optional<EvaluatorManager> optionalEvaluatorManager = this.evaluators.get(task.id());
+
+      if (!optionalEvaluatorManager.isPresent() && !TaskState.COMPLETED.equals(task.state())) {
+        // This usually means that the evaluator shim has started, but hasn't sent the status message
+        // back to the driver yet.
+        LOG.log(Level.FINE, "No Evaluator found for Azure Batch task id = {0}. Ignoring.", task.id());
+      } else if (!optionalEvaluatorManager.isPresent() && TaskState.COMPLETED.equals(task.state())) {
+        // This indicates that the evaluator shim exited prematurely. We inform REEF of resource allocation
+        // so it's possible to trigger an event signaling resource failure later.
+        LOG.log(Level.INFO, "Azure Batch task id = {0} is in 'COMPLETED' state, but it does not have " +
+            "an Evaluator associated with it. This indicates that the evaluator shim has failed before " +
+            "it could send a callback to the driver.", task.id());
+        this.evaluatorShimManager.get().onResourceAllocated(task.id(), PLACEHOLDER_REMOTE_ID, Optional.of(task));
+      } else if (optionalEvaluatorManager.get().isClosedOrClosing()) {
+        LOG.log(Level.FINE, "Evaluator id = {0} is closed. Ignoring.", task.id());
+      } else {
+        LOG.log(Level.FINE, "Reporting status for Task Id: {0} is [Azure Batch Status]:{1} ",
+            new Object[]{task.id(), task.state().toString()});
+        this.evaluatorShimManager.get().onAzureBatchTaskStatus(task);
+      }
+    }
+
+    synchronized (this) {
+      if (this.isAlarmEnabled()) {
+        this.scheduleAlarm();
+      }
+    }
+  }
+
+  /**
+   * Enable the period alarm to send status updates.
+   */
+  public synchronized void enableAlarm() {
+    if (!this.isAlarmEnabled) {
+      LOG.log(Level.FINE, "Enabling the alarm and scheduling it to fire in {0} ms.", this.taskStatusCheckPeriod);
+      this.isAlarmEnabled = true;
+      this.scheduleAlarm();
+    } else {
+      LOG.log(Level.FINE, "Alarm is already enabled.");
+    }
+  }
+
+  /**
+   * Disable the period alarm to send status updates.
+   */
+  public synchronized void disableAlarm() {
+    this.isAlarmEnabled = false;
+  }
+
+  private boolean isAlarmEnabled() {
+    return this.isAlarmEnabled;
+  }
+
+  private void scheduleAlarm() {
+    this.clock.scheduleAlarm(this.taskStatusCheckPeriod, this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java
new file mode 100644
index 0000000..9db16ec
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.resourcemanager.*;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Logger;
+
+/**
+ * Helper that represents the REEF layer to the Azure Batch runtime.
+ */
+@Private
+public final class REEFEventHandlers {
+  private final EventHandler<ResourceAllocationEvent> resourceAllocationHandler;
+  private final EventHandler<ResourceStatusEvent> resourceStatusHandler;
+  private final EventHandler<RuntimeStatusEvent> runtimeStatusHandler;
+  private final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler;
+  private static final Logger LOG = Logger.getLogger(REEFEventHandlers.class.getName());
+
+  @Inject
+  REEFEventHandlers(@Parameter(RuntimeParameters.NodeDescriptorHandler.class)
+                    final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler,
+                    @Parameter(RuntimeParameters.RuntimeStatusHandler.class)
+                    final EventHandler<RuntimeStatusEvent> runtimeStatusProtoEventHandler,
+                    @Parameter(RuntimeParameters.ResourceAllocationHandler.class)
+                    final EventHandler<ResourceAllocationEvent> resourceAllocationHandler,
+                    @Parameter(RuntimeParameters.ResourceStatusHandler.class)
+                    final EventHandler<ResourceStatusEvent> resourceStatusHandler) {
+    this.resourceAllocationHandler = resourceAllocationHandler;
+    this.resourceStatusHandler = resourceStatusHandler;
+    this.runtimeStatusHandler = runtimeStatusProtoEventHandler;
+    this.nodeDescriptorEventHandler = nodeDescriptorEventHandler;
+  }
+
+  /**
+   * Inform reef of a node.
+   *
+   * @param nodeDescriptorEvent
+   */
+  void onNodeDescriptor(final NodeDescriptorEvent nodeDescriptorEvent) {
+    this.nodeDescriptorEventHandler.onNext(nodeDescriptorEvent);
+  }
+
+  /**
+   * Update REEF's view on the runtime status.
+   *
+   * @param runtimeStatusEvent
+   */
+  @Private
+  public void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusEvent) {
+    this.runtimeStatusHandler.onNext(runtimeStatusEvent);
+  }
+
+  /**
+   * Inform REEF of a fresh resource allocation.
+   *
+   * @param resourceAllocationEvent
+   */
+  @Private
+  public void onResourceAllocation(final ResourceAllocationEvent resourceAllocationEvent) {
+    this.resourceAllocationHandler.onNext(resourceAllocationEvent);
+  }
+
+  /**
+   * Update REEF on a change to the status of a resource.
+   *
+   * @param resourceStatusEvent
+   */
+  void onResourceStatus(final ResourceStatusEvent resourceStatusEvent) {
+    this.resourceStatusHandler.onNext(resourceStatusEvent);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java
new file mode 100644
index 0000000..aa61db0
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.driver;
+
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Runtime Identifier Implementation.
+ */
+@Private
+public final class RuntimeIdentifier {
+
+  /**
+   * Same value is defined on the C# side in the Org.Apache.REEF.Common.Runtime.RuntimeName.
+   */
+  public static final String RUNTIME_NAME = "AzBatch";
+
+  private RuntimeIdentifier() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java
new file mode 100644
index 0000000..37addfe
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver-side implementation of the REEF runtime for Azure Batch.
+ */
+package org.apache.reef.runtime.azbatch.driver;

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java
new file mode 100644
index 0000000..1579c26
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.evaluator;
+
+import com.microsoft.windowsazure.storage.StorageException;
+import com.microsoft.windowsazure.storage.blob.CloudBlob;
+import com.microsoft.windowsazure.storage.blob.CloudBlockBlob;
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorShimProtocol;
+import org.apache.reef.runtime.azbatch.parameters.ContainerIdentifier;
+import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+/**
+ * The evaluator shim acts as a wrapper process around the Evaluator. Azure Batch starts this process on the evaluator
+ * node at the time that the resource is allocated. Once started, the evaluator shim process will send a status
+ * message back to the the Driver which triggers a
+ * {@link org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent} on the Driver side.
+ * The evaluator shim will then wait for a command from the Driver to start the Evaluator process.
+ * Upon receiving the command, the shim will launch the evaluator process and wait for it to exit. After receiving
+ * a terminate command, the evaluator shim will exit thus releasing the resource and completing the Azure Batch task.
+ */
+@Private
+@EvaluatorSide
+public final class EvaluatorShim
+    implements EventHandler<RemoteMessage<EvaluatorShimProtocol.EvaluatorShimControlProto>> {
+  private static final Logger LOG = Logger.getLogger(EvaluatorShim.class.getName());
+
+  private final RemoteManager remoteManager;
+  private final REEFFileNames reefFileNames;
+  private final AzureBatchFileNames azureBatchFileNames;
+  private final ConfigurationSerializer configurationSerializer;
+
+  private final String driverRemoteId;
+  private final String containerId;
+
+  private final EventHandler<EvaluatorShimProtocol.EvaluatorShimStatusProto> evaluatorShimStatusChannel;
+  private final AutoCloseable evaluatorShimCommandChannel;
+
+  private final ExecutorService threadPool;
+
+  private Process evaluatorProcess;
+  private Integer evaluatorProcessExitValue;
+
+  @Inject
+  EvaluatorShim(final REEFFileNames reefFileNames,
+                final AzureBatchFileNames azureBatchFileNames,
+                final ConfigurationSerializer configurationSerializer,
+                final RemoteManager remoteManager,
+                @Parameter(DriverRemoteIdentifier.class) final String driverRemoteId,
+                @Parameter(ContainerIdentifier.class) final String containerId) {
+    this.reefFileNames = reefFileNames;
+    this.azureBatchFileNames = azureBatchFileNames;
+    this.configurationSerializer = configurationSerializer;
+
+    this.driverRemoteId = driverRemoteId;
+    this.containerId = containerId;
+
+    this.remoteManager = remoteManager;
+    this.evaluatorShimStatusChannel = this.remoteManager.getHandler(this.driverRemoteId,
+        EvaluatorShimProtocol.EvaluatorShimStatusProto.class);
+
+    this.evaluatorShimCommandChannel = this.remoteManager
+        .registerHandler(EvaluatorShimProtocol.EvaluatorShimControlProto.class, this);
+
+    this.threadPool = Executors.newCachedThreadPool();
+  }
+
+  /**
+   * Starts the {@link EvaluatorShim}.
+   */
+  public void run() {
+    LOG.log(Level.FINEST, "Entering EvaluatorShim.run().");
+    this.onStart();
+  }
+
+  /**
+   * Stops the {@link EvaluatorShim}.
+   */
+  public void stop() {
+    LOG.log(Level.FINEST, "Entering EvaluatorShim.stop().");
+    this.onStop();
+  }
+
+  /**
+   * This method is invoked by the Remote Manager when a command message from the Driver is received.
+   *
+   * @param remoteMessage the message sent to the evaluator shim by the Driver.
+   */
+  @Override
+  public void onNext(final RemoteMessage<EvaluatorShimProtocol.EvaluatorShimControlProto> remoteMessage) {
+    final EvaluatorShimProtocol.EvaluatorShimCommand command = remoteMessage.getMessage().getCommand();
+    switch (command) {
+    case LAUNCH_EVALUATOR:
+      LOG.log(Level.INFO, "Received a command to launch the Evaluator.");
+      this.threadPool.submit(new Runnable() {
+        @Override
+        public void run() {
+          EvaluatorShim.this.onEvaluatorLaunch(remoteMessage.getMessage().getEvaluatorLaunchCommand(),
+              remoteMessage.getMessage().getEvaluatorConfigString(),
+              remoteMessage.getMessage().getEvaluatorFileResourcesUrl());
+        }
+      });
+      break;
+
+    case TERMINATE:
+      LOG.log(Level.INFO, "Received a command to terminate the EvaluatorShim.");
+      this.threadPool.submit(new Runnable() {
+        @Override
+        public void run() {
+          EvaluatorShim.this.onStop();
+        }
+      });
+      break;
+
+    default:
+      LOG.log(Level.WARNING, "An unknown command was received by the EvaluatorShim: {0}.", command);
+      throw new IllegalArgumentException("An unknown command was received by the EvaluatorShim.");
+    }
+  }
+
+  private void onStart() {
+    LOG.log(Level.FINEST, "Entering EvaluatorShim.onStart().");
+
+    LOG.log(Level.INFO, "Reporting back to the driver with Shim Status = {0}",
+        EvaluatorShimProtocol.EvaluatorShimStatus.ONLINE);
+    this.evaluatorShimStatusChannel.onNext(
+        EvaluatorShimProtocol.EvaluatorShimStatusProto
+            .newBuilder()
+            .setRemoteIdentifier(this.remoteManager.getMyIdentifier())
+            .setContainerId(this.containerId)
+            .setStatus(EvaluatorShimProtocol.EvaluatorShimStatus.ONLINE)
+            .build());
+
+    LOG.log(Level.FINEST, "Exiting EvaluatorShim.onStart().");
+  }
+
+  private void onStop() {
+    LOG.log(Level.FINEST, "Entering EvaluatorShim.onStop().");
+
+    try {
+      LOG.log(Level.INFO, "Closing EvaluatorShim Control channel.");
+      this.evaluatorShimCommandChannel.close();
+    } catch (Exception e) {
+      LOG.log(Level.SEVERE, "An unexpected exception occurred while attempting to close the EvaluatorShim " +
+          "control channel.");
+      throw new RuntimeException(e);
+    }
+
+    try {
+      LOG.log(Level.INFO, "Closing the Remote Manager.");
+      this.remoteManager.close();
+    } catch (Exception e) {
+      LOG.log(Level.SEVERE, "Failed to close the RemoteManager with the following exception: {0}.", e);
+      throw new RuntimeException(e);
+    }
+
+    LOG.log(Level.INFO, "Shutting down the thread pool.");
+    this.threadPool.shutdown();
+
+    LOG.log(Level.FINEST, "Exiting EvaluatorShim.onStop().");
+  }
+
+  private void onEvaluatorLaunch(final String launchCommand, final String evaluatorConfigString,
+                                 final String fileResourcesUrl) {
+    LOG.log(Level.FINEST, "Entering EvaluatorShim.onEvaluatorLaunch().");
+
+    if (StringUtils.isNotBlank(fileResourcesUrl)) {
+      LOG.log(Level.FINER, "Downloading evaluator resource file archive from {0}.", fileResourcesUrl);
+      try {
+        File tmpFile = downloadFile(fileResourcesUrl);
+        extractFiles(tmpFile);
+      } catch (StorageException | IOException e) {
+        LOG.log(Level.SEVERE, "Failed to download evaluator file resources: {0}. {1}",
+            new Object[]{fileResourcesUrl, e});
+        throw new RuntimeException(e);
+      }
+    } else {
+      LOG.log(Level.FINER, "No file resources URL given.");
+    }
+
+    File evaluatorConfigurationFile = new File(this.reefFileNames.getEvaluatorConfigurationPath());
+    LOG.log(Level.FINER, "Persisting evaluator config at: {0}", evaluatorConfigurationFile.getAbsolutePath());
+
+    try {
+      boolean newFileCreated = evaluatorConfigurationFile.createNewFile();
+      LOG.log(Level.FINEST,
+          newFileCreated ? "Created a new file for persisting evaluator configuration at {0}."
+              : "Using existing file for persisting evaluator configuration at {0}.",
+          evaluatorConfigurationFile.getAbsolutePath());
+
+      Configuration evaluatorConfiguration = this.configurationSerializer.fromString(evaluatorConfigString);
+      this.configurationSerializer.toFile(evaluatorConfiguration, evaluatorConfigurationFile);
+    } catch (final IOException | BindException e) {
+      LOG.log(Level.SEVERE, "An unexpected exception occurred while attempting to deserialize and write " +
+          "Evaluator configuration file. {0}", e);
+      throw new RuntimeException("Unable to write configuration.", e);
+    }
+
+    LOG.log(Level.INFO, "Launching the evaluator by invoking the following command: " + launchCommand);
+
+    try {
+      final List<String> command = Arrays.asList(launchCommand.split(" "));
+      this.evaluatorProcess = new ProcessBuilder()
+          .command(command)
+          .redirectError(new File(this.azureBatchFileNames.getEvaluatorStdErrFilename()))
+          .redirectOutput(new File(this.azureBatchFileNames.getEvaluatorStdOutFilename()))
+          .start();
+
+      // This will block the current thread until the Evaluator process completes.
+      this.evaluatorProcessExitValue = EvaluatorShim.this.evaluatorProcess.waitFor();
+      LOG.log(Level.INFO, "Evaluator process completed with exit value: {0}.", this.evaluatorProcessExitValue);
+    } catch (IOException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    LOG.log(Level.FINEST, "Exiting EvaluatorShim.onEvaluatorLaunch().");
+  }
+
+  private File downloadFile(final String url) throws IOException, StorageException {
+    URI fileUri = URI.create(url);
+    File downloadedFile = new File(this.azureBatchFileNames.getEvaluatorResourceFilesJarName());
+    LOG.log(Level.FINE, "Downloading evaluator file resources to {0}.", downloadedFile.getAbsolutePath());
+
+    try (FileOutputStream fileStream = new FileOutputStream(downloadedFile)) {
+      CloudBlob blob = new CloudBlockBlob(fileUri);
+      blob.download(fileStream);
+    }
+
+    return downloadedFile;
+  }
+
+  private void extractFiles(final File zipFile) throws IOException {
+    try (ZipFile zipFileHandle = new ZipFile(zipFile)) {
+      Enumeration<? extends ZipEntry> zipEntries = zipFileHandle.entries();
+      while (zipEntries.hasMoreElements()) {
+        ZipEntry zipEntry = zipEntries.nextElement();
+        File file = new File(this.reefFileNames.getREEFFolderName() + '/' + zipEntry.getName());
+        if (file.exists()) {
+          LOG.log(Level.INFO, "Skipping entry {0} because the file already exists.", zipEntry.getName());
+        } else {
+          if (zipEntry.isDirectory()) {
+            if (file.mkdirs()) {
+              LOG.log(Level.INFO, "Creating directory {0}.", zipEntry.getName());
+            } else {
+              LOG.log(Level.INFO, "Directory {0} already exists. Ignoring.", zipEntry.getName());
+            }
+          } else {
+            try (InputStream inputStream = zipFileHandle.getInputStream(zipEntry)) {
+              LOG.log(Level.INFO, "Extracting {0}.", zipEntry.getName());
+              Files.copy(inputStream, Paths.get(this.reefFileNames.getREEFFolderName() + '/' + zipEntry.getName()));
+              LOG.log(Level.INFO, "Extracting {0} completed.", zipEntry.getName());
+            }
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
new file mode 100644
index 0000000..c114ba2
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.evaluator;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.ContainerIdentifier;
+import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier;
+import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+
+/**
+ * ConfigurationModule to create evaluator shim configurations.
+ */
+@Private
+@EvaluatorSide
+public final class EvaluatorShimConfiguration extends ConfigurationModuleBuilder {
+
+  /**
+   * @see org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier
+   */
+  public static final RequiredParameter<String> DRIVER_REMOTE_IDENTIFIER = new RequiredParameter<>();
+
+  /**
+   * @see org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier
+   */
+  public static final RequiredParameter<String> CONTAINER_IDENTIFIER = new RequiredParameter<>();
+
+  public static final ConfigurationModule CONF = new EvaluatorShimConfiguration()
+      .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+      .bindNamedParameter(DriverRemoteIdentifier.class, DRIVER_REMOTE_IDENTIFIER)
+      .bindNamedParameter(ContainerIdentifier.class, CONTAINER_IDENTIFIER)
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java
new file mode 100644
index 0000000..bfc5c1d
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.evaluator;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.runtime.azbatch.parameters.EvaluatorShimConfigFilePath;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The main entry point into the {@link EvaluatorShim}.
+ */
+@Private
+@EvaluatorSide
+public final class EvaluatorShimLauncher {
+
+  private static final Logger LOG = Logger.getLogger(EvaluatorShimLauncher.class.getName());
+
+  private final String configurationFilePath;
+  private final ConfigurationSerializer configurationSerializer;
+
+  @Inject
+  EvaluatorShimLauncher(
+      @Parameter(EvaluatorShimConfigFilePath.class) final String configurationFilePath,
+      final ConfigurationSerializer configurationSerializer) {
+    this.configurationFilePath = configurationFilePath;
+    this.configurationSerializer = configurationSerializer;
+  }
+
+  /**
+   * Launch the {@link EvaluatorShim}.
+   * @throws Exception
+   */
+  public void launch() throws Exception {
+    final Injector injector = Tang.Factory.getTang().newInjector(readConfigurationFromDisk(this.configurationFilePath));
+    final EvaluatorShim evaluatorShim = injector.getInstance(EvaluatorShim.class);
+    evaluatorShim.run();
+  }
+
+  private Configuration readConfigurationFromDisk(final String configPath) {
+
+    LOG.log(Level.FINER, "Loading configuration file: {0}", configPath);
+
+    final File shimConfigurationFile = new File(configPath);
+
+    if (!shimConfigurationFile.exists() || !shimConfigurationFile.canRead()) {
+      throw new RuntimeException(
+          "Configuration file " + configPath + " doesn't exist or is not readable.",
+          new IOException(configPath));
+    }
+
+    try {
+      final Configuration config = this.configurationSerializer.fromFile(shimConfigurationFile);
+      LOG.log(Level.FINEST, "Configuration file loaded: {0}", configPath);
+      return config;
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to parse the configuration file: " + configPath, e);
+    }
+  }
+
+  private static Configuration parseCommandLine(final String[] args) {
+    if (args.length != 1) {
+      throw new RuntimeException("Expected configuration file name as an argument.");
+    }
+
+    final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder();
+    confBuilder.bindNamedParameter(EvaluatorShimConfigFilePath.class, args[0]);
+
+    return confBuilder.build();
+  }
+
+  /**
+   * The starting point of the evaluator shim launcher.
+   */
+  public static void main(final String[] args) throws Exception {
+    LOG.log(Level.INFO, "Entering EvaluatorShimLauncher.main().");
+
+    final Injector injector = Tang.Factory.getTang().newInjector(parseCommandLine(args));
+    final EvaluatorShimLauncher launcher = injector.getInstance(EvaluatorShimLauncher.class);
+    launcher.launch();
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java
new file mode 100644
index 0000000..cfa7b1b
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Client for the REEF runtime for Azure Batch.
+ */
+package org.apache.reef.runtime.azbatch.evaluator;

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java
new file mode 100644
index 0000000..e10dbd6
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * A REEF runtime for Azure Batch.
+ */
+package org.apache.reef.runtime.azbatch;

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java
new file mode 100644
index 0000000..2efe842
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Batch account key.
+ */
+@NamedParameter(doc = "The Azure Batch account key.")
+public final class AzureBatchAccountKey implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java
new file mode 100644
index 0000000..09f49d7
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Batch account name.
+ */
+@NamedParameter(doc = "The Azure Batch account name.")
+public final class AzureBatchAccountName implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java
new file mode 100644
index 0000000..1f2ec54
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Name;
+
+/**
+ * The Azure Batch account URI.
+ */
+@NamedParameter(doc = "The Azure Batch account URI.")
+public final class AzureBatchAccountUri implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java
new file mode 100644
index 0000000..88137f8
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The Azure Batch pool ID.
+ */
+@NamedParameter(doc = "The Azure Batch pool ID.")
+public final class AzureBatchPoolId implements Name<String> {
+}