You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:46:51 UTC

[18/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
new file mode 100644
index 0000000..fc1696e
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -0,0 +1,680 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.collections.ListUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.ResourceStatusProto;
+import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
+import org.apache.reef.runtime.yarn.util.YarnTypes;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.remote.Encoder;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+
+import javax.inject.Inject;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+final class YarnContainerManager
+    implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler {
+
+  private static final Logger LOG = Logger.getLogger(YarnContainerManager.class.getName());
+
+  private static final String RUNTIME_NAME = "YARN";
+
+  private static final String ADD_FLAG = "+";
+
+  private static final String REMOVE_FLAG = "-";
+
+  private final YarnClient yarnClient = YarnClient.createYarnClient();
+
+  private final Queue<AMRMClient.ContainerRequest> requestsBeforeSentToRM = new ConcurrentLinkedQueue<>();
+
+  private final Queue<AMRMClient.ContainerRequest> requestsAfterSentToRM = new ConcurrentLinkedQueue<>();
+
+  private final Map<String, String> nodeIdToRackName = new ConcurrentHashMap<>();
+
+  private final YarnConfiguration yarnConf;
+  private final AMRMClientAsync resourceManager;
+  private final NMClientAsync nodeManager;
+  private final REEFEventHandlers reefEventHandlers;
+  private final Containers containers;
+  private final ApplicationMasterRegistration registration;
+  private final ContainerRequestCounter containerRequestCounter;
+  private final DriverStatusManager driverStatusManager;
+  private final TrackingURLProvider trackingURLProvider;
+
+  @Inject
+  YarnContainerManager(
+      final YarnConfiguration yarnConf,
+      final @Parameter(YarnHeartbeatPeriod.class) int yarnRMHeartbeatPeriod,
+      final REEFEventHandlers reefEventHandlers,
+      final Containers containers,
+      final ApplicationMasterRegistration registration,
+      final ContainerRequestCounter containerRequestCounter,
+      final DriverStatusManager driverStatusManager,
+      final TrackingURLProvider trackingURLProvider) throws IOException {
+
+    this.reefEventHandlers = reefEventHandlers;
+    this.driverStatusManager = driverStatusManager;
+
+    this.containers = containers;
+    this.registration = registration;
+    this.containerRequestCounter = containerRequestCounter;
+    this.yarnConf = yarnConf;
+    this.trackingURLProvider = trackingURLProvider;
+
+
+    this.yarnClient.init(this.yarnConf);
+
+    this.resourceManager = AMRMClientAsync.createAMRMClientAsync(yarnRMHeartbeatPeriod, this);
+    this.nodeManager = new NMClientAsyncImpl(this);
+    LOG.log(Level.FINEST, "Instantiated YarnContainerManager");
+  }
+
+  @Override
+  public final void onContainersCompleted(final List<ContainerStatus> containerStatuses) {
+    for (final ContainerStatus containerStatus : containerStatuses) {
+      onContainerStatus(containerStatus);
+    }
+  }
+
+  @Override
+  public final void onContainersAllocated(final List<Container> containers) {
+
+    // ID is used for logging only
+    final String id = String.format("%s:%d",
+        Thread.currentThread().getName().replace(' ', '_'), System.currentTimeMillis());
+
+    LOG.log(Level.FINE, "TIME: Allocated Containers {0} {1} of {2}",
+        new Object[]{id, containers.size(), this.containerRequestCounter.get()});
+
+    for (final Container container : containers) {
+      handleNewContainer(container, false);
+    }
+
+    LOG.log(Level.FINE, "TIME: Processed Containers {0}", id);
+  }
+
+  @Override
+  public void onShutdownRequest() {
+    this.reefEventHandlers.onRuntimeStatus(RuntimeStatusProto.newBuilder()
+        .setName(RUNTIME_NAME).setState(ReefServiceProtos.State.DONE).build());
+    this.driverStatusManager.onError(new Exception("Shutdown requested by YARN."));
+  }
+
+  @Override
+  public void onNodesUpdated(final List<NodeReport> nodeReports) {
+    for (final NodeReport nodeReport : nodeReports) {
+      this.nodeIdToRackName.put(nodeReport.getNodeId().toString(), nodeReport.getRackName());
+      onNodeReport(nodeReport);
+    }
+  }
+
+  @Override
+  public final float getProgress() {
+    return 0; // TODO: return actual values for progress
+  }
+
+  @Override
+  public final void onError(final Throwable throwable) {
+    onRuntimeError(throwable);
+  }
+
+  @Override
+  public final void onContainerStarted(
+      final ContainerId containerId, final Map<String, ByteBuffer> stringByteBufferMap) {
+    final Optional<Container> container = this.containers.getOptional(containerId.toString());
+    if (container.isPresent()) {
+      this.nodeManager.getContainerStatusAsync(containerId, container.get().getNodeId());
+    }
+  }
+
+  @Override
+  public final void onContainerStatusReceived(
+      final ContainerId containerId, final ContainerStatus containerStatus) {
+    onContainerStatus(containerStatus);
+  }
+
+  @Override
+  public final void onContainerStopped(final ContainerId containerId) {
+    final boolean hasContainer = this.containers.hasContainer(containerId.toString());
+    if (hasContainer) {
+      final ResourceStatusProto.Builder resourceStatusBuilder =
+          ResourceStatusProto.newBuilder().setIdentifier(containerId.toString());
+      resourceStatusBuilder.setState(ReefServiceProtos.State.DONE);
+      this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
+    }
+  }
+
+  @Override
+  public final void onStartContainerError(
+      final ContainerId containerId, final Throwable throwable) {
+    handleContainerError(containerId, throwable);
+  }
+
+  @Override
+  public final void onGetContainerStatusError(
+      final ContainerId containerId, final Throwable throwable) {
+    handleContainerError(containerId, throwable);
+  }
+
+  @Override
+  public final void onStopContainerError(
+      final ContainerId containerId, final Throwable throwable) {
+    handleContainerError(containerId, throwable);
+  }
+
+  /**
+   * Submit the given launchContext to the given container.
+   */
+  void submit(final Container container, final ContainerLaunchContext launchContext) {
+    this.nodeManager.startContainerAsync(container, launchContext);
+  }
+
+  /**
+   * Release the given container.
+   */
+  void release(final String containerId) {
+    LOG.log(Level.FINE, "Release container: {0}", containerId);
+    final Container container = this.containers.removeAndGet(containerId);
+    this.resourceManager.releaseAssignedContainer(container.getId());
+    logContainerRemoval(container.getId().toString());
+    updateRuntimeStatus();
+  }
+
+  void onStart() {
+
+    this.yarnClient.start();
+    this.resourceManager.init(this.yarnConf);
+    this.resourceManager.start();
+    this.nodeManager.init(this.yarnConf);
+    this.nodeManager.start();
+
+    try {
+      for (final NodeReport nodeReport : this.yarnClient.getNodeReports(NodeState.RUNNING)) {
+        onNodeReport(nodeReport);
+      }
+    } catch (IOException | YarnException e) {
+      LOG.log(Level.WARNING, "Unable to fetch node reports from YARN.", e);
+      onRuntimeError(e);
+    }
+
+    try {
+      this.registration.setRegistration(this.resourceManager.registerApplicationMaster(
+          "", 0, this.trackingURLProvider.getTrackingUrl()));
+      LOG.log(Level.FINE, "YARN registration: {0}", registration);
+
+    } catch (final YarnException | IOException e) {
+      LOG.log(Level.WARNING, "Unable to register application master.", e);
+      onRuntimeError(e);
+    }
+
+    // TODO: this is currently being developed on a hacked 2.4.0 bits, should be 2.4.1
+    final String minVersionToGetPreviousContainer = "2.4.0";
+
+    // when supported, obtain the list of the containers previously allocated, and write info to driver folder
+    if (YarnTypes.isAtOrAfterVersion(minVersionToGetPreviousContainer)) {
+      LOG.log(Level.FINEST, "Hadoop version is {0} or after with support to retain previous containers, processing previous containers.", minVersionToGetPreviousContainer);
+      processPreviousContainers();
+    }
+  }
+
+  void onStop() {
+
+    LOG.log(Level.FINE, "Stop Runtime: RM status {0}", this.resourceManager.getServiceState());
+
+    if (this.resourceManager.getServiceState() == Service.STATE.STARTED) {
+      // invariant: if RM is still running then we declare success.
+      try {
+        this.reefEventHandlers.close();
+        this.resourceManager.unregisterApplicationMaster(
+            FinalApplicationStatus.SUCCEEDED, null, null);
+        this.resourceManager.close();
+      } catch (final Exception e) {
+        LOG.log(Level.WARNING, "Error shutting down YARN application", e);
+      }
+    }
+
+    if (this.nodeManager.getServiceState() == Service.STATE.STARTED) {
+      try {
+        this.nodeManager.close();
+      } catch (final IOException e) {
+        LOG.log(Level.WARNING, "Error closing YARN Node Manager", e);
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////
+  // HELPER METHODS
+
+  private void onNodeReport(final NodeReport nodeReport) {
+    LOG.log(Level.FINE, "Send node descriptor: {0}", nodeReport);
+    this.reefEventHandlers.onNodeDescriptor(NodeDescriptorProto.newBuilder()
+        .setIdentifier(nodeReport.getNodeId().toString())
+        .setHostName(nodeReport.getNodeId().getHost())
+        .setPort(nodeReport.getNodeId().getPort())
+        .setMemorySize(nodeReport.getCapability().getMemory())
+        .setRackName(nodeReport.getRackName())
+        .build());
+  }
+
+  private void handleContainerError(final ContainerId containerId, final Throwable throwable) {
+
+    final ResourceStatusProto.Builder resourceStatusBuilder =
+        ResourceStatusProto.newBuilder().setIdentifier(containerId.toString());
+
+    resourceStatusBuilder.setState(ReefServiceProtos.State.FAILED);
+    resourceStatusBuilder.setExitCode(1);
+    resourceStatusBuilder.setDiagnostics(throwable.getMessage());
+    this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build());
+  }
+
+  private void processPreviousContainers() {
+    final List<Container> previousContainers = this.registration.getRegistration().getContainersFromPreviousAttempts();
+    if (previousContainers != null && !previousContainers.isEmpty()) {
+      LOG.log(Level.INFO, "Driver restarted, with {0} previous containers", previousContainers.size());
+      this.driverStatusManager.setNumPreviousContainers(previousContainers.size());
+      final Set<String> expectedContainers = getExpectedContainersFromLogReplay();
+      final int numExpectedContainers = expectedContainers.size();
+      final int numPreviousContainers = previousContainers.size();
+      if (numExpectedContainers > numPreviousContainers) {
+        // we expected more containers to be alive, some containers must have died during driver restart
+        LOG.log(Level.WARNING, "Expected {0} containers while only {1} are still alive", new Object[]{numExpectedContainers, numPreviousContainers});
+        final Set<String> previousContainersIds = new HashSet<>();
+        for (final Container container : previousContainers) {
+          previousContainersIds.add(container.getId().toString());
+        }
+        for (final String expectedContainerId : expectedContainers) {
+          if (!previousContainersIds.contains(expectedContainerId)) {
+            logContainerRemoval(expectedContainerId);
+            LOG.log(Level.WARNING, "Expected container [{0}] not alive, must have failed during driver restart.", expectedContainerId);
+            informAboutConatinerFailureDuringRestart(expectedContainerId);
+          }
+        }
+      }
+      if (numExpectedContainers < numPreviousContainers) {
+        // somehow we have more alive evaluators, this should not happen
+        throw new RuntimeException("Expected only [" + numExpectedContainers + "] containers but resource manager believe that [" + numPreviousContainers + "] are outstanding for driver.");
+      }
+
+      //  numExpectedContainers == numPreviousContainers
+      for (final Container container : previousContainers) {
+        LOG.log(Level.FINE, "Previous container: [{0}]", container.toString());
+        if (!expectedContainers.contains(container.getId().toString())) {
+          throw new RuntimeException("Not expecting container " + container.getId().toString());
+        }
+        handleNewContainer(container, true);
+      }
+    }
+  }
+
+  /**
+   * Handles container status reports. Calls come from YARN.
+   *
+   * @param value containing the container status
+   */
+  private void onContainerStatus(final ContainerStatus value) {
+
+    final String containerId = value.getContainerId().toString();
+    final boolean hasContainer = this.containers.hasContainer(containerId);
+
+    if (hasContainer) {
+      LOG.log(Level.FINE, "Received container status: {0}", containerId);
+
+      final ResourceStatusProto.Builder status =
+          ResourceStatusProto.newBuilder().setIdentifier(containerId);
+
+      switch (value.getState()) {
+        case COMPLETE:
+          LOG.log(Level.FINE, "Container completed: status {0}", value.getExitStatus());
+          switch (value.getExitStatus()) {
+            case 0:
+              status.setState(ReefServiceProtos.State.DONE);
+              break;
+            case 143:
+              status.setState(ReefServiceProtos.State.KILLED);
+              break;
+            default:
+              status.setState(ReefServiceProtos.State.FAILED);
+          }
+          status.setExitCode(value.getExitStatus());
+          // remove the completed container (can be either done/killed/failed) from book keeping
+          this.containers.removeAndGet(containerId);
+          logContainerRemoval(containerId);
+          break;
+        default:
+          LOG.info("Container running");
+          status.setState(ReefServiceProtos.State.RUNNING);
+      }
+
+      if (value.getDiagnostics() != null) {
+        LOG.log(Level.FINE, "Container diagnostics: {0}", value.getDiagnostics());
+        status.setDiagnostics(value.getDiagnostics());
+      }
+
+      this.reefEventHandlers.onResourceStatus(status.build());
+    }
+  }
+
+  void onContainerRequest(final AMRMClient.ContainerRequest... containerRequests) {
+
+    synchronized (this) {
+      this.containerRequestCounter.incrementBy(containerRequests.length);
+      this.requestsBeforeSentToRM.addAll(Arrays.asList(containerRequests));
+      doHomogeneousRequests();
+    }
+
+    this.updateRuntimeStatus();
+  }
+
+  /**
+   * Handles new container allocations. Calls come from YARN.
+   *
+   * @param container newly allocated
+   */
+  private void handleNewContainer(final Container container, final boolean isRecoveredContainer) {
+
+    LOG.log(Level.FINE, "allocated container: id[ {0} ]", container.getId());
+    // recovered container is not new allocation, it is just checking back from previous driver failover
+    if (!isRecoveredContainer) {
+      synchronized (this) {
+        if (matchContainerWithPendingRequest(container)) {
+          final AMRMClient.ContainerRequest matchedRequest = this.requestsAfterSentToRM.peek();
+          this.containerRequestCounter.decrement();
+          this.containers.add(container);
+
+          LOG.log(Level.FINEST, "{0} matched with {1}", new Object[]{container.toString(), matchedRequest.toString()});
+
+          // Due to the bug YARN-314 and the workings of AMRMCClientAsync, when x-priority m-capacity zero-container request
+          // and x-priority n-capacity nonzero-container request are sent together, where m > n, RM ignores the latter.
+          // Therefore it is necessary avoid sending zero-container request, even it means getting extra containers.
+          // It is okay to send nonzero m-capacity and n-capacity request together since bigger containers can be matched.
+          // TODO: revisit this when implementing locality-strictness (i.e. a specific rack request can be ignored)
+          if (this.requestsAfterSentToRM.size() > 1) {
+            try {
+              this.resourceManager.removeContainerRequest(matchedRequest);
+            } catch (final Exception e) {
+              LOG.log(Level.WARNING, "Nothing to remove from Async AMRM client's queue, removal attempt failed with exception", e);
+            }
+          }
+
+          this.requestsAfterSentToRM.remove();
+          doHomogeneousRequests();
+
+          LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number = {1}", new Object[]{container.getResource().getMemory(), container.getResource().getVirtualCores()});
+          this.reefEventHandlers.onResourceAllocation(ResourceAllocationProto.newBuilder()
+              .setIdentifier(container.getId().toString())
+              .setNodeId(container.getNodeId().toString())
+              .setResourceMemory(container.getResource().getMemory())
+              .setVirtualCores(container.getResource().getVirtualCores())
+              .build());
+          // we only add this to Container log after the Container has been registered as an REEF Evaluator.
+          logContainerAddition(container.getId().toString());
+          this.updateRuntimeStatus();
+        } else {
+          LOG.log(Level.WARNING, "Got an extra container {0} that doesn't match, releasing...", container.getId());
+          this.resourceManager.releaseAssignedContainer(container.getId());
+        }
+      }
+    }
+  }
+
+  private synchronized void doHomogeneousRequests() {
+    if (this.requestsAfterSentToRM.isEmpty()) {
+      final AMRMClient.ContainerRequest firstRequest = this.requestsBeforeSentToRM.peek();
+
+      while (!this.requestsBeforeSentToRM.isEmpty() && isSameKindOfRequest(firstRequest, this.requestsBeforeSentToRM.peek())) {
+        final AMRMClient.ContainerRequest homogeneousRequest = this.requestsBeforeSentToRM.remove();
+        this.resourceManager.addContainerRequest(homogeneousRequest);
+        this.requestsAfterSentToRM.add(homogeneousRequest);
+      }
+    }
+  }
+
+  private boolean isSameKindOfRequest(AMRMClient.ContainerRequest r1, AMRMClient.ContainerRequest r2) {
+    return r1.getPriority().compareTo(r2.getPriority()) == 0
+        && r1.getCapability().compareTo(r2.getCapability()) == 0
+        && r1.getRelaxLocality() == r2.getRelaxLocality()
+        && ListUtils.isEqualList(r1.getNodes(), r2.getNodes())
+        && ListUtils.isEqualList(r1.getRacks(), r2.getRacks());
+  }
+
+  /**
+   * Match to see whether the container satisfies the request.
+   * We take into consideration that RM has some freedom in rounding
+   * up the allocation and in placing containers on other machines.
+   */
+  private boolean matchContainerWithPendingRequest(Container container) {
+    if (this.requestsAfterSentToRM.isEmpty()) {
+      return false;
+    }
+
+    final AMRMClient.ContainerRequest request = this.requestsAfterSentToRM.peek();
+    final boolean resourceCondition = container.getResource().getMemory() >= request.getCapability().getMemory(); // TODO: check vcores once YARN-2380 is resolved
+    final boolean nodeCondition = request.getNodes() == null
+        || request.getNodes().contains(container.getNodeId().getHost());
+    final boolean rackCondition = request.getRacks() == null
+        || request.getRacks().contains(this.nodeIdToRackName.get(container.getNodeId().toString()));
+
+    return resourceCondition && (request.getRelaxLocality() || (rackCondition && nodeCondition));
+  }
+
+  /**
+   * Update the driver with my current status
+   */
+  private void updateRuntimeStatus() {
+
+    final DriverRuntimeProtocol.RuntimeStatusProto.Builder builder =
+        DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
+            .setName(RUNTIME_NAME)
+            .setState(ReefServiceProtos.State.RUNNING)
+            .setOutstandingContainerRequests(this.containerRequestCounter.get());
+
+    for (final String allocatedContainerId : this.containers.getContainerIds()) {
+      builder.addContainerAllocation(allocatedContainerId);
+    }
+
+    this.reefEventHandlers.onRuntimeStatus(builder.build());
+  }
+
+  private void onRuntimeError(final Throwable throwable) {
+
+    // SHUTDOWN YARN
+    try {
+      this.reefEventHandlers.close();
+      this.resourceManager.unregisterApplicationMaster(
+          FinalApplicationStatus.FAILED, throwable.getMessage(), null);
+    } catch (final Exception e) {
+      LOG.log(Level.WARNING, "Error shutting down YARN application", e);
+    } finally {
+      this.resourceManager.stop();
+    }
+
+    final RuntimeStatusProto.Builder runtimeStatusBuilder = RuntimeStatusProto.newBuilder()
+        .setState(ReefServiceProtos.State.FAILED)
+        .setName(RUNTIME_NAME);
+
+    final Encoder<Throwable> codec = new ObjectSerializableCodec<>();
+    runtimeStatusBuilder.setError(ReefServiceProtos.RuntimeErrorProto.newBuilder()
+        .setName(RUNTIME_NAME)
+        .setMessage(throwable.getMessage())
+        .setException(ByteString.copyFrom(codec.encode(throwable)))
+        .build())
+        .build();
+
+    this.reefEventHandlers.onRuntimeStatus(runtimeStatusBuilder.build());
+  }
+
+  private Set<String> getExpectedContainersFromLogReplay() {
+    final org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
+    config.setBoolean("dfs.support.append", true);
+    config.setBoolean("dfs.support.broken.append", true);
+    final Set<String> expectedContainers = new HashSet<>();
+    try {
+      final FileSystem fs = FileSystem.get(config);
+      final Path path = new Path(getChangeLogLocation());
+      if (!fs.exists(path)) {
+        // empty set
+        return expectedContainers;
+      } else {
+        final BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
+        String line = br.readLine();
+        while (line != null) {
+          if (line.startsWith(ADD_FLAG)) {
+            final String containerId = line.substring(ADD_FLAG.length());
+            if (expectedContainers.contains(containerId)) {
+              throw new RuntimeException("Duplicated add container record found in the change log for container " + containerId);
+            }
+            expectedContainers.add(containerId);
+          } else if (line.startsWith(REMOVE_FLAG)) {
+            final String containerId = line.substring(REMOVE_FLAG.length());
+            if (!expectedContainers.contains(containerId)) {
+              throw new RuntimeException("Change log includes record that try to remove non-exist or duplicate remove record for container + " + containerId);
+            }
+            expectedContainers.remove(containerId);
+          }
+          line = br.readLine();
+        }
+        br.close();
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException("Cannot read from log file", e);
+    }
+    return expectedContainers;
+  }
+
+  private void informAboutConatinerFailureDuringRestart(final String containerId) {
+    LOG.log(Level.WARNING, "Container [" + containerId +
+        "] has failed during driver restart process, FailedEvaluaorHandler will be triggered, but no additional evaluator can be requested due to YARN-2433.");
+    // trigger a failed evaluator event
+    this.reefEventHandlers.onResourceStatus(ResourceStatusProto.newBuilder()
+        .setIdentifier(containerId)
+        .setState(ReefServiceProtos.State.FAILED)
+        .setExitCode(1)
+        .setDiagnostics("Container [" + containerId + "] failed during driver restart process.")
+        .setIsFromPreviousDriver(true)
+        .build());
+  }
+
+  private void writeToEvaluatorLog(final String entry) throws IOException {
+    final org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
+    config.setBoolean("dfs.support.append", true);
+    config.setBoolean("dfs.support.broken.append", true);
+    final FileSystem fs = getFileSystemInstance();
+    final Path path = new Path(getChangeLogLocation());
+    final boolean appendToLog = fs.exists(path);
+
+    try (
+        final BufferedWriter bw = appendToLog ?
+            new BufferedWriter(new OutputStreamWriter(fs.append(path))) :
+            new BufferedWriter(new OutputStreamWriter(fs.create(path)));
+    ) {
+      bw.write(entry);
+    } catch (final IOException e) {
+      if (appendToLog) {
+        LOG.log(Level.FINE, "Unable to add an entry to the Evaluator log. Attempting append by delete and recreate", e);
+        appendByDeleteAndCreate(fs, path, entry);
+      }
+    }
+  }
+
+  private FileSystem getFileSystemInstance() throws IOException {
+    final org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();
+    config.setBoolean("dfs.support.append", true);
+    config.setBoolean("dfs.support.broken.append", true);
+    return FileSystem.get(config);
+  }
+
+  /**
+   * For certain HDFS implementation, the append operation may not be supported (e.g., Azure blob - wasb)
+   * in this case, we will emulate the append operation by reading the content, appending entry at the end,
+   * then recreating the file with appended content.
+   *
+   * @throws java.io.IOException when the file can't be written.
+   */
+
+  private void appendByDeleteAndCreate(final FileSystem fs, final Path path, final String appendEntry) throws IOException {
+    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+    try (final InputStream inputStream = fs.open(path)) {
+      IOUtils.copyBytes(inputStream, outputStream, 4096, true);
+    }
+
+    final String newContent = outputStream.toString() + appendEntry;
+    fs.delete(path, true);
+
+    try (final FSDataOutputStream newOutput = fs.create(path);
+         final InputStream newInput = new ByteArrayInputStream(newContent.getBytes())) {
+      IOUtils.copyBytes(newInput, newOutput, 4096, true);
+    }
+
+  }
+
+  private String getChangeLogLocation() {
+    return "/ReefApplications/" + EvaluatorManager.getJobIdentifier() + "/evaluatorsChangesLog";
+  }
+
+  private void logContainerAddition(final String containerId) {
+    final String entry = ADD_FLAG + containerId + System.lineSeparator();
+    try {
+      writeToEvaluatorLog(entry);
+    } catch (final IOException e) {
+      LOG.log(Level.WARNING, "Unable to log the addition of container [" + containerId +
+          "] to the container log. Driver restart won't work properly.", e);
+    }
+  }
+
+  private void logContainerRemoval(final String containerId) {
+    final String entry = REMOVE_FLAG + containerId + System.lineSeparator();
+    try {
+      writeToEvaluatorLog(entry);
+    } catch (final IOException e) {
+      LOG.log(Level.WARNING, "Unable to log the removal of container [" + containerId +
+          "] to the container log. Driver restart won't work properly.", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandler.java
new file mode 100644
index 0000000..f2c3a24
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+@DefaultImplementation(YarnContainerRequestHandlerImpl.class)
+public interface YarnContainerRequestHandler {
+  /**
+   * Enqueue a set of container requests with YARN.
+   *
+   * @param containerRequests
+   */
+  void onContainerRequest(final AMRMClient.ContainerRequest... containerRequests);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandlerImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandlerImpl.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandlerImpl.java
new file mode 100644
index 0000000..4007a5d
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandlerImpl.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Side-channel to request containers from YARN using AMRMClient.ContainerRequest requests.
+ */
+public final class YarnContainerRequestHandlerImpl implements YarnContainerRequestHandler {
+  private static final Logger LOG = Logger.getLogger(YarnContainerRequestHandlerImpl.class.getName());
+
+  private final YarnContainerManager containerManager;
+
+
+  @Inject
+  YarnContainerRequestHandlerImpl(final YarnContainerManager containerManager) {
+    this.containerManager = containerManager;
+    LOG.log(Level.FINEST, "Instantiated 'YarnContainerRequestHandler'");
+  }
+
+  @Override
+  public void onContainerRequest(final AMRMClient.ContainerRequest... containerRequests) {
+    LOG.log(Level.FINEST, "Sending container requests to YarnContainerManager.");
+    this.containerManager.onContainerRequest(containerRequests);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
new file mode 100644
index 0000000..f240b7c
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverConfiguration.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.reef.io.TempFileCreator;
+import org.apache.reef.io.WorkingDirectoryTempFileCreator;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
+import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
+import org.apache.reef.runtime.yarn.YarnClasspathProvider;
+import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
+import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
+import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.time.Clock;
+
+/**
+ * Created by marku_000 on 2014-07-07.
+ */
+public class YarnDriverConfiguration extends ConfigurationModuleBuilder {
+  /**
+   * @see org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory
+   */
+  public static final RequiredParameter<String> JOB_SUBMISSION_DIRECTORY = new RequiredParameter<>();
+  /**
+   * @see org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod.class
+   */
+  public static final OptionalParameter<Integer> YARN_HEARTBEAT_INTERVAL = new OptionalParameter<>();
+
+  /**
+   * @see AbstractDriverRuntimeConfiguration.JobIdentifier.class
+   */
+  public static final RequiredParameter<String> JOB_IDENTIFIER = new RequiredParameter<>();
+
+  /**
+   * @see AbstractDriverRuntimeConfiguration.EvaluatorTimeout
+   */
+  public static final OptionalParameter<Long> EVALUATOR_TIMEOUT = new OptionalParameter<>();
+
+  /**
+   * The client remote identifier.
+   */
+  public static final OptionalParameter<String> CLIENT_REMOTE_IDENTIFIER = new OptionalParameter<>();
+
+  /**
+   * The fraction of the container memory NOT to use for the Java Heap.
+   */
+  public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
+
+
+  public static ConfigurationModule CONF = new YarnDriverConfiguration()
+      // Bind the YARN runtime for the resource manager.
+      .bindImplementation(ResourceLaunchHandler.class, YARNResourceLaunchHandler.class)
+      .bindImplementation(ResourceReleaseHandler.class, YARNResourceReleaseHandler.class)
+      .bindImplementation(ResourceRequestHandler.class, YarnResourceRequestHandler.class)
+      .bindConstructor(YarnConfiguration.class, YarnConfigurationConstructor.class)
+      .bindSetEntry(Clock.RuntimeStartHandler.class, YARNRuntimeStartHandler.class)
+      .bindSetEntry(Clock.RuntimeStopHandler.class, YARNRuntimeStopHandler.class)
+      .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class)
+
+          // Bind the YARN Configuration parameters
+      .bindNamedParameter(JobSubmissionDirectory.class, JOB_SUBMISSION_DIRECTORY)
+      .bindNamedParameter(YarnHeartbeatPeriod.class, YARN_HEARTBEAT_INTERVAL)
+
+          // Bind the fields bound in AbstractDriverRuntimeConfiguration
+      .bindNamedParameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class, JOB_IDENTIFIER)
+      .bindNamedParameter(AbstractDriverRuntimeConfiguration.EvaluatorTimeout.class, EVALUATOR_TIMEOUT)
+      .bindNamedParameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class, CLIENT_REMOTE_IDENTIFIER)
+      .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK)
+      .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class)
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
new file mode 100644
index 0000000..6762362
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Accepts resource requests from the REEF layer, translates them into requests for YARN and hands them to the
+ * appropriate handler for those.
+ */
+@DriverSide
+@Private
+public final class YarnResourceRequestHandler implements ResourceRequestHandler {
+
+  private static final Logger LOG = Logger.getLogger(YarnResourceRequestHandler.class.getName());
+  private final YarnContainerRequestHandler yarnContainerRequestHandler;
+  private final ApplicationMasterRegistration registration;
+
+  @Inject
+  YarnResourceRequestHandler(final YarnContainerRequestHandler yarnContainerRequestHandler,
+                             final ApplicationMasterRegistration registration) {
+    this.yarnContainerRequestHandler = yarnContainerRequestHandler;
+    this.registration = registration;
+  }
+
+  @Override
+  public synchronized void onNext(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
+    LOG.log(Level.FINEST, "Got ResourceRequestProto in YarnResourceRequestHandler: memory = {0}, cores = {1}.", new Object[]{resourceRequestProto.getMemorySize(), resourceRequestProto.getVirtualCores()});
+
+    final String[] nodes = resourceRequestProto.getNodeNameCount() == 0 ? null :
+        resourceRequestProto.getNodeNameList().toArray(new String[resourceRequestProto.getNodeNameCount()]);
+    final String[] racks = resourceRequestProto.getRackNameCount() == 0 ? null :
+        resourceRequestProto.getRackNameList().toArray(new String[resourceRequestProto.getRackNameCount()]);
+
+    // set the priority for the request
+    final Priority pri = getPriority(resourceRequestProto);
+    final Resource resource = getResource(resourceRequestProto);
+    final boolean relax_locality = !resourceRequestProto.hasRelaxLocality() || resourceRequestProto.getRelaxLocality();
+
+    final AMRMClient.ContainerRequest[] containerRequests =
+        new AMRMClient.ContainerRequest[resourceRequestProto.getResourceCount()];
+
+    for (int i = 0; i < resourceRequestProto.getResourceCount(); i++) {
+      containerRequests[i] = new AMRMClient.ContainerRequest(resource, nodes, racks, pri, relax_locality);
+    }
+    this.yarnContainerRequestHandler.onContainerRequest(containerRequests);
+  }
+
+  private synchronized Resource getResource(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
+    final Resource result = Records.newRecord(Resource.class);
+    final int memory = getMemory(resourceRequestProto.getMemorySize());
+    final int core = resourceRequestProto.getVirtualCores();
+    LOG.log(Level.FINEST, "Resource requested: memory = {0}, virtual core count = {1}.", new Object[]{memory, core});
+    result.setMemory(memory);
+    result.setVirtualCores(core);
+    return result;
+  }
+
+  private synchronized Priority getPriority(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
+    final Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(resourceRequestProto.hasPriority() ? resourceRequestProto.getPriority() : 1);
+    return pri;
+  }
+
+  private synchronized int getMemory(final int requestedMemory) {
+    final int result;
+    if (!this.registration.isPresent()) {
+      LOG.log(Level.WARNING, "AM doesn't seem to be registered. Proceed with fingers crossed.");
+      result = requestedMemory;
+    } else {
+      final int maxMemory = registration.getRegistration().getMaximumResourceCapability().getMemory();
+      if (requestedMemory > maxMemory) {
+        LOG.log(Level.WARNING, "Asking for {0}MB of memory, but max on this cluster is {1}MB ",
+            new Object[]{requestedMemory, maxMemory});
+        result = maxMemory;
+      } else {
+        result = requestedMemory;
+      }
+    }
+    return result;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/package-info.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/package-info.java
new file mode 100644
index 0000000..63f621d
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * The Driver-Side implementation of the YARN adapter for REEF.
+ */
+package org.apache.reef.runtime.yarn.driver;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java
new file mode 100644
index 0000000..395793d
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/JobSubmissionDirectory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The job submission directory.
+ */
+@NamedParameter(doc = "The job submission directory.")
+public final class JobSubmissionDirectory implements Name<String> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnHeartbeatPeriod.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnHeartbeatPeriod.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnHeartbeatPeriod.java
new file mode 100644
index 0000000..f3f3c65
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/parameters/YarnHeartbeatPeriod.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * How often we talk to YARN.
+ */
+@NamedParameter(doc = "How often we talk to YARN.", default_value = "1000")
+public final class YarnHeartbeatPeriod implements Name<Integer> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnConfigurationConstructor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnConfigurationConstructor.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnConfigurationConstructor.java
new file mode 100644
index 0000000..c80f9bf
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnConfigurationConstructor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.util;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.reef.tang.ExternalConstructor;
+
+import javax.inject.Inject;
+
+/**
+ * An external constructor that creates YarnConfiguration instances.
+ */
+public final class YarnConfigurationConstructor implements ExternalConstructor<YarnConfiguration> {
+  @Inject
+  YarnConfigurationConstructor() {
+  }
+
+  @Override
+  public YarnConfiguration newInstance() {
+    return new YarnConfiguration();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
new file mode 100644
index 0000000..ad60db0
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/util/YarnTypes.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.util;
+
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.reef.annotations.audience.Private;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class that creates the various records in the YARN API
+ */
+@Private
+public final class YarnTypes {
+
+  private YarnTypes() {
+  }
+
+  /**
+   * @return a ContainerLaunchContext with the given commands and LocalResources.
+   */
+  public static final ContainerLaunchContext getContainerLaunchContext(
+      final List<String> commands, final Map<String, LocalResource> localResources) {
+    final ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+    context.setLocalResources(localResources);
+    context.setCommands(commands);
+    return context;
+  }
+
+  public static boolean isAtOrAfterVersion(final String version) {
+    final String hadoopVersion = VersionInfo.getVersion();
+
+    if (hadoopVersion == null || hadoopVersion.length() < version.length()) {
+      throw new RuntimeException("unsupported or incomplete hadoop version number provided for comparison: " + hadoopVersion);
+    }
+
+    return hadoopVersion.substring(0, version.length()).compareTo(version) >= 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-runtime-yarn/src/main/test/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandlerTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/test/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandlerTest.java b/lang/java/reef-runtime-yarn/src/main/test/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandlerTest.java
new file mode 100644
index 0000000..0f08b15
--- /dev/null
+++ b/lang/java/reef-runtime-yarn/src/main/test/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandlerTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver;
+
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.runtime.common.driver.EvaluatorRequestorImpl;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for YarnResourceRequestHandler.
+ */
+public final class YarnResourceRequestHandlerTest {
+  private final ApplicationMasterRegistration applicationMasterRegistration = new ApplicationMasterRegistration();
+  private final MockContainerRequestHandler containerRequestHandler = new MockContainerRequestHandler();
+  private final YarnResourceRequestHandler resourceRequestHandler = new YarnResourceRequestHandler(containerRequestHandler, applicationMasterRegistration);
+  private final ResourceCatalog resourceCatalog = Mockito.mock(ResourceCatalog.class);
+  private final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, resourceRequestHandler);
+
+  private class MockContainerRequestHandler implements YarnContainerRequestHandler {
+    private AMRMClient.ContainerRequest[] requests;
+
+    @Override
+    public void onContainerRequest(AMRMClient.ContainerRequest... containerRequests) {
+      this.requests = containerRequests;
+    }
+
+    public AMRMClient.ContainerRequest[] getRequests() {
+      return requests;
+    }
+  }
+
+  /**
+   * Tests whether the amount of memory is transferred correctly.
+   */
+  @Test
+  public void testDifferentMemory() {
+    final EvaluatorRequest requestOne = EvaluatorRequest.newBuilder()
+        .setNumber(1)
+        .setMemory(64)
+        .setNumberOfCores(1)
+        .build();
+    final EvaluatorRequest requestTwo = EvaluatorRequest.newBuilder()
+        .setNumber(1)
+        .setMemory(128)
+        .setNumberOfCores(2)
+        .build();
+    {
+      evaluatorRequestor.submit(requestOne);
+      Assert.assertEquals("Request in REEF and YARN form should have the same amount of memory",
+          requestOne.getMegaBytes(),
+          containerRequestHandler.getRequests()[0].getCapability().getMemory()
+      );
+    }
+    {
+      evaluatorRequestor.submit(requestTwo);
+      Assert.assertEquals("Request in REEF and YARN form should have the same amount of memory",
+          requestTwo.getMegaBytes(),
+          containerRequestHandler.getRequests()[0].getCapability().getMemory()
+      );
+    }
+    {
+      evaluatorRequestor.submit(requestOne);
+      Assert.assertNotEquals("Request in REEF and YARN form should have the same amount of memory",
+          requestTwo.getMegaBytes(),
+          containerRequestHandler.getRequests()[0].getCapability().getMemory()
+      );
+    }
+  }
+
+  @Test
+  public void testEvaluatorCount() {
+    final EvaluatorRequest requestOne = EvaluatorRequest.newBuilder()
+        .setNumber(1)
+        .setMemory(64)
+        .setNumberOfCores(1)
+        .build();
+    final EvaluatorRequest requestTwo = EvaluatorRequest.newBuilder()
+        .setNumber(2)
+        .setMemory(128)
+        .setNumberOfCores(2)
+        .build();
+    {
+      evaluatorRequestor.submit(requestOne);
+      Assert.assertEquals("Request in REEF and YARN form should have the same number of Evaluators",
+          requestOne.getNumber(),
+          containerRequestHandler.getRequests().length
+      );
+    }
+        {
+      evaluatorRequestor.submit(requestTwo);
+      Assert.assertEquals("Request in REEF and YARN form should have the same number of Evaluators",
+          requestTwo.getNumber(),
+          containerRequestHandler.getRequests().length
+      );
+    }
+    {
+      evaluatorRequestor.submit(requestTwo);
+      Assert.assertNotEquals("Request in REEF and YARN form should have the same number of Evaluators",
+          requestOne.getNumber(),
+          containerRequestHandler.getRequests().length
+      );
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tang/.gitattributes
----------------------------------------------------------------------
diff --git a/lang/java/reef-tang/.gitattributes b/lang/java/reef-tang/.gitattributes
new file mode 100644
index 0000000..db5b15f
--- /dev/null
+++ b/lang/java/reef-tang/.gitattributes
@@ -0,0 +1,3 @@
+# Commit text files using LF endings
+*.java text eol=lf whitespace=trailing-space,space-before-tab,tab-in-indent,blank-at-eof
+* text=auto

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-tang/.gitignore
----------------------------------------------------------------------
diff --git a/lang/java/reef-tang/.gitignore b/lang/java/reef-tang/.gitignore
new file mode 100644
index 0000000..7e02bcd
--- /dev/null
+++ b/lang/java/reef-tang/.gitignore
@@ -0,0 +1,15 @@
+tmp
+bin
+tang.conf
+.DS_Store
+target
+generated
+.settings
+.classpath
+.project
+.sw[op]
+*.sw[op]
+.externalToolBuilders
+*~
+*.iml
+.idea