You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/07/09 22:01:02 UTC

incubator-reef git commit: [REEF-468] Support multiple compute and data requests in DataLoader

Repository: incubator-reef
Updated Branches:
  refs/heads/master 3aaca1841 -> 87c181dc8


[REEF-468] Support multiple compute and data requests in DataLoader

This commit allows to request a user-defined number of compute and data
evaluators to the DataLoader. Each evaluator can be requested to run in
particular locations, using the rack information. As there are some
clients of the DataLoadingRequestBuilder, we maintain its previous
functionality, though we marked several fields as deprecated. There is
still pending to implement the different strategies on how to assign
partitions to the different evaluators. For now, we use the previous
implementation (tries to match the host, if not, assigns them randomly).
Needed to update EvaluatorRequest.Builder to make "cloning" easier.

JIRA:
  [REEF-468](https://issues.apache.org/jira/browse/REEF-468)

Pull Request:
  This closes #287


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/87c181dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/87c181dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/87c181dc

Branch: refs/heads/master
Commit: 87c181dc817d854f5d62c6a4ecf803b8673a984f
Parents: 3aaca18
Author: nachocano <na...@gmail.com>
Authored: Thu Jul 9 00:41:01 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Thu Jul 9 13:00:18 2015 -0700

----------------------------------------------------------------------
 .../reef/driver/evaluator/EvaluatorRequest.java |  14 ++
 .../reef/io/data/loading/api/DataLoader.java    | 150 +++++++++++++-----
 .../loading/api/DataLoadingRequestBuilder.java  | 152 +++++++++++++++++--
 3 files changed, 263 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/87c181dc/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
index 26dd34e..dbaf6bd 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
@@ -166,9 +166,23 @@ public final class EvaluatorRequest {
     private Builder() {
     }
 
+    /**
+     * Pre-populates the builder with the values extracted from the request.
+     *
+     * @param request
+     *          the request
+     */
     private Builder(final EvaluatorRequest request) {
       setNumber(request.getNumber());
       fromDescriptor(request.getDescriptor());
+      setMemory(request.getMegaBytes());
+      setNumberOfCores(request.getNumberOfCores());
+      for (final String nodeName : request.getNodeNames()) {
+        addNodeName(nodeName);
+      }
+      for (final String rackName : request.getRackNames()) {
+        addRackName(rackName);
+      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/87c181dc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
index 00a8368..ef2779f 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.io.data.loading.api;
 
+import org.apache.commons.lang.Validate;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.context.ContextConfiguration;
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
@@ -37,6 +38,10 @@ import org.apache.reef.wake.time.event.Alarm;
 import org.apache.reef.wake.time.event.StartTime;
 
 import javax.inject.Inject;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -67,17 +72,23 @@ public class DataLoader {
   private final BlockingQueue<Pair<Configuration, Configuration>> failedDataEvalConfigs = new LinkedBlockingQueue<>();
 
   private final AtomicInteger numComputeRequestsToSubmit = new AtomicInteger(0);
+  private final AtomicInteger numDataRequestsToSubmit = new AtomicInteger(0);
 
   private final DataLoadingService dataLoadingService;
-  private final int dataEvalMemoryMB;
-  private final int dataEvalCore;
-  private final EvaluatorRequest computeRequest;
+  private int dataEvalMemoryMB;
+  private int dataEvalCore;
   private final SingleThreadStage<EvaluatorRequest> resourceRequestStage;
   private final ResourceRequestHandler resourceRequestHandler;
-  private final int computeEvalMemoryMB;
-  private final int computeEvalCore;
+  private int computeEvalMemoryMB;
+  private int computeEvalCore;
   private final EvaluatorRequestor requestor;
 
+  /**
+   * @deprecated since 0.12. Should use the other constructor instead, which
+   *             allows to specify different compute requests (i.e. masters) and
+   *             data requests (i.e. slaves), in particular racks
+   */
+  @Deprecated
   @Inject
   public DataLoader(
       final Clock clock,
@@ -86,7 +97,38 @@ public class DataLoader {
       @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorMemoryMB.class) final int dataEvalMemoryMB,
       @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorNumberOfCores.class) final int dataEvalCore,
       @Parameter(DataLoadingRequestBuilder.DataLoadingComputeRequest.class) final String serializedComputeRequest) {
+    this(clock, requestor, dataLoadingService, new HashSet<String>(
+        Arrays.asList(serializedComputeRequest)), new HashSet<String>(
+        Arrays.asList(EvaluatorRequestSerializer.serialize(EvaluatorRequest
+            .newBuilder().setMemory(dataEvalMemoryMB)
+            .setNumberOfCores(dataEvalCore).build()))));
+  }
 
+  /**
+   * Allows to specify compute and data evaluator requests in particular
+   * locations.
+   *
+   * @param clock
+   *          the clock
+   * @param requestor
+   *          the evaluator requestor
+   * @param dataLoadingService
+   *          the data loading service
+   * @param serializedComputeRequests
+   *          serialized compute requests (evaluators that will not load data)
+   * @param serializedDataRequests
+   *          serialized data requests (evaluators that will load data). It
+   *          cannot be empty (to maintain previous functionality)
+   */
+  @Inject
+  public DataLoader(
+      final Clock clock,
+      final EvaluatorRequestor requestor,
+      final DataLoadingService dataLoadingService,
+      @Parameter(DataLoadingRequestBuilder.DataLoadingComputeRequests.class) final Set<String> serializedComputeRequests,
+      @Parameter(DataLoadingRequestBuilder.DataLoadingDataRequests.class) final Set<String> serializedDataRequests) {
+    // data requests should not be empty. This maintains previous functionality
+    Validate.notEmpty(serializedDataRequests, "Should contain a data request object");
     // FIXME: Issue #855: We need this alarm to look busy for REEF.
     clock.scheduleAlarm(30000, new EventHandler<Alarm>() {
       @Override
@@ -97,33 +139,48 @@ public class DataLoader {
 
     this.requestor = requestor;
     this.dataLoadingService = dataLoadingService;
-    this.dataEvalMemoryMB = dataEvalMemoryMB;
-    this.dataEvalCore = dataEvalCore;
     this.resourceRequestHandler = new ResourceRequestHandler(requestor);
-    this.resourceRequestStage = new SingleThreadStage<>(this.resourceRequestHandler, 2);
+    // the resource request queue will have as many requests as compute and data requests.
+    this.resourceRequestStage = new SingleThreadStage<>(
+        this.resourceRequestHandler, serializedComputeRequests.size()
+            + serializedDataRequests.size());
 
-    if (serializedComputeRequest.equals("NULL")) {
-      this.computeRequest = null;
+    if (serializedComputeRequests.isEmpty()) {
       this.computeEvalMemoryMB = -1;
-      computeEvalCore = 1;
+      this.computeEvalCore = 1;
     } else {
-      this.computeRequest = EvaluatorRequestSerializer.deserialize(serializedComputeRequest);
-      this.computeEvalMemoryMB = this.computeRequest.getMegaBytes();
-      this.computeEvalCore = this.computeRequest.getNumberOfCores();
-      this.numComputeRequestsToSubmit.set(this.computeRequest.getNumber());
-
-      this.resourceRequestStage.onNext(this.computeRequest);
+      // Deserialize each compute request.
+      // Keep the maximum number of cores and memory requested, in case some
+      // evaluator fails, we will try to reallocate based on that.
+      for (final String serializedComputeRequest : serializedComputeRequests) {
+        final EvaluatorRequest computeRequest = EvaluatorRequestSerializer.deserialize(serializedComputeRequest);
+        this.numComputeRequestsToSubmit.addAndGet(computeRequest.getNumber());
+        this.computeEvalMemoryMB = Math.max(this.computeEvalMemoryMB, computeRequest.getMegaBytes());
+        this.computeEvalCore = Math.max(this.computeEvalCore, computeRequest.getNumberOfCores());
+        this.resourceRequestStage.onNext(computeRequest);
+      }
+    }
+    // Deserialize each data requests.
+    // We distribute the partitions evenly accross the DCs.
+    // The number of partitions extracted from the dataLoadingService override
+    // the number of evaluators requested (this preserves previous functionality)
+    final int dcs = serializedDataRequests.size();
+    final int partitionsPerDataCenter = this.dataLoadingService.getNumberOfPartitions() / dcs;
+    int missing = this.dataLoadingService.getNumberOfPartitions() % dcs;
+    for (final String serializedDataRequest : serializedDataRequests) {
+      EvaluatorRequest dataRequest = EvaluatorRequestSerializer.deserialize(serializedDataRequest);
+      this.dataEvalMemoryMB = Math.max(this.dataEvalMemoryMB, dataRequest.getMegaBytes());
+      this.dataEvalCore = Math.max(this.dataEvalCore, dataRequest.getNumberOfCores());
+      // clone the request but update the number of evaluators based on the number of partitions
+      int number = partitionsPerDataCenter;
+      if (missing > 0) {
+        number++;
+        missing--;
+      }
+      dataRequest = EvaluatorRequest.newBuilder(dataRequest).setNumber(number).build();
+      this.numDataRequestsToSubmit.addAndGet(number);
+      this.resourceRequestStage.onNext(dataRequest);
     }
-
-    this.resourceRequestStage.onNext(getDataLoadingRequest());
-  }
-
-  private EvaluatorRequest getDataLoadingRequest() {
-    return EvaluatorRequest.newBuilder()
-        .setNumber(this.dataLoadingService.getNumberOfPartitions())
-        .setMemory(this.dataEvalMemoryMB)
-        .setNumberOfCores(this.dataEvalCore)
-        .build();
   }
 
   public class StartHandler implements EventHandler<StartTime> {
@@ -165,27 +222,35 @@ public class DataLoader {
       }
 
       final int evaluatorsForComputeRequest = numComputeRequestsToSubmit.decrementAndGet();
-      LOG.log(Level.FINE, "Evaluators for compute request: {0}", evaluatorsForComputeRequest);
 
       if (evaluatorsForComputeRequest >= 0) {
+        LOG.log(Level.FINE, "Evaluators for compute request: {0}", evaluatorsForComputeRequest);
         try {
-          final Configuration idConfiguration = ContextConfiguration.CONF
-              .set(ContextConfiguration.IDENTIFIER,
-                  dataLoadingService.getComputeContextIdPrefix() + evaluatorsForComputeRequest)
-              .build();
+          final Configuration idConfiguration = ContextConfiguration.CONF.set(
+              ContextConfiguration.IDENTIFIER,
+              dataLoadingService.getComputeContextIdPrefix()
+                  + evaluatorsForComputeRequest).build();
           LOG.log(Level.FINE, "Submitting Compute Context to {0}", evalId);
           allocatedEvaluator.submitContext(idConfiguration);
-          submittedComputeEvalConfigs.put(allocatedEvaluator.getId(), idConfiguration);
-          if (evaluatorsForComputeRequest == 0) {
-            LOG.log(Level.FINE, "All Compute requests satisfied. Releasing gate");
-            resourceRequestHandler.releaseResourceRequestGate();
-          }
+          submittedComputeEvalConfigs.put(allocatedEvaluator.getId(),
+              idConfiguration);
+          // should release the request gate when there are >= 0 compute
+          // requests (now that we can have more than 1)
+          LOG.log(
+              Level.FINE,
+              evaluatorsForComputeRequest > 0 ? "More Compute requests need to be satisfied"
+                  : "All Compute requests satisfied." + " Releasing gate");
+          resourceRequestHandler.releaseResourceRequestGate();
         } catch (final BindException e) {
-          throw new RuntimeException("Unable to bind context id for Compute request", e);
+          throw new RuntimeException(
+              "Unable to bind context id for Compute request", e);
         }
 
       } else {
 
+        final int evaluatorsForDataRequest = numDataRequestsToSubmit.decrementAndGet();
+        LOG.log(Level.FINE, "Evaluators for data request: {0}", evaluatorsForDataRequest);
+
         final Pair<Configuration, Configuration> confPair = new Pair<>(
             dataLoadingService.getContextConfiguration(allocatedEvaluator),
             dataLoadingService.getServiceConfiguration(allocatedEvaluator));
@@ -193,6 +258,15 @@ public class DataLoader {
         LOG.log(Level.FINE, "Submitting data loading context to {0}", evalId);
         allocatedEvaluator.submitContextAndService(confPair.first, confPair.second);
         submittedDataEvalConfigs.put(allocatedEvaluator.getId(), confPair);
+
+        // release the gate to keep on asking for more "data" evaluators.
+        if (evaluatorsForDataRequest > 0) {
+          LOG.log(Level.FINE, "More Data requests need to be satisfied. Releasing gate");
+          resourceRequestHandler.releaseResourceRequestGate();
+        // don't need to release if it's 0
+        } else if (evaluatorsForDataRequest == 0) {
+          LOG.log(Level.FINE, "All Data requests satisfied");
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/87c181dc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
index 66512cb..380ef09 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.io.data.loading.api;
 
+import org.apache.commons.lang.Validate;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
@@ -35,16 +36,35 @@ import org.apache.reef.tang.annotations.NamedParameter;
 import org.apache.reef.tang.exceptions.BindException;
 import org.apache.reef.tang.formats.ConfigurationModule;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
 /**
  * Builder to create a request to the DataLoadingService.
  */
 public final class DataLoadingRequestBuilder
     implements org.apache.reef.util.Builder<Configuration> {
 
-  private int memoryMB = -1;
-  private int numberOfCores = -1;
-  private int numberOfDesiredSplits = -1;
-  private EvaluatorRequest computeRequest = null;
+  // constant used in several places.
+  private static final int UNINITIALIZED = -1;
+
+  /**
+   * @deprecated since 0.12. Should use instead
+   *             {@link DataLoadingRequestBuilder#dataRequests}
+   */
+  @Deprecated
+  private int memoryMB = UNINITIALIZED;
+  /**
+   * @deprecated since 0.12. Should use instead
+   *             {@link DataLoadingRequestBuilder#dataRequests}
+   */
+  @Deprecated
+  private int numberOfCores = UNINITIALIZED;
+  private int numberOfDesiredSplits = UNINITIALIZED;
+  private List<EvaluatorRequest> computeRequests = new ArrayList<>();
+  private final List<EvaluatorRequest> dataRequests = new ArrayList<>();
   private boolean inMemory = false;
   private boolean renewFailedEvaluators = true;
   private ConfigurationModule driverConfigurationModule = null;
@@ -78,8 +98,71 @@ public final class DataLoadingRequestBuilder
     return this;
   }
 
+  /**
+   * Adds the requests to the compute requests list.
+   *
+   * @param computeRequests
+   *          the compute requests to add
+   * @return this
+   */
+  public DataLoadingRequestBuilder addComputeRequests(final List<EvaluatorRequest> computeRequests) {
+    for (final EvaluatorRequest computeRequest : computeRequests) {
+      addComputeRequest(computeRequest);
+    }
+    return this;
+  }
+
+  /**
+   * Adds the requests to the data requests list.
+   *
+   * @param dataRequests
+   *          the data requests to add
+   * @return this
+   */
+  public DataLoadingRequestBuilder addDataRequests(final List<EvaluatorRequest> dataRequests) {
+    for (final EvaluatorRequest dataRequest : dataRequests) {
+      addDataRequest(dataRequest);
+    }
+    return this;
+  }
+
+  /**
+   * Adds a single request to the compute requests list.
+   *
+   * @param computeRequest
+   *          the compute request to add
+   * @return this
+   */
+  public DataLoadingRequestBuilder addComputeRequest(final EvaluatorRequest computeRequest) {
+    this.computeRequests.add(computeRequest);
+    return this;
+  }
+
+  /**
+   * Adds a single request to the data requests list.
+   *
+   * @param dataRequest
+   *          the data request to add
+   * @return this
+   */
+  public DataLoadingRequestBuilder addDataRequest(final EvaluatorRequest dataRequest) {
+    this.dataRequests.add(dataRequest);
+    return this;
+  }
+
+  /**
+   * Sets the compute request.
+   *
+   * @deprecated since 0.12. Should use instead
+   *             {@link DataLoadingRequestBuilder#addComputeRequest(EvaluatorRequest)}
+   *             or {@link DataLoadingRequestBuilder#addComputeRequests(List)}
+   * @param computeRequest
+   *          the compute request
+   * @return this
+   */
+  @Deprecated
   public DataLoadingRequestBuilder setComputeRequest(final EvaluatorRequest computeRequest) {
-    this.computeRequest = computeRequest;
+    this.computeRequests = new ArrayList<>(Arrays.asList(computeRequest));
     return this;
   }
 
@@ -145,17 +228,34 @@ public final class DataLoadingRequestBuilder
       jcb.bindNamedParameter(NumberOfDesiredSplits.class, "" + this.numberOfDesiredSplits);
     }
 
-    if (this.memoryMB > 0) {
-      jcb.bindNamedParameter(DataLoadingEvaluatorMemoryMB.class, "" + this.memoryMB);
+    // if empty, then the user code still uses the deprecated fields.
+    // we create a dataLoadRequest object based on them (or their default values)
+    if (this.dataRequests.isEmpty()) {
+      final int dataMemoryMB = this.memoryMB > 0 ? this.memoryMB : Integer
+          .valueOf(DataLoadingEvaluatorMemoryMB.DEFAULT_DATA_MEMORY);
+      final int dataCores = this.numberOfCores > 0 ? this.numberOfCores : Integer
+          .valueOf(DataLoadingEvaluatorNumberOfCores.DEFAULT_DATA_CORES);
+      final EvaluatorRequest defaultDataRequest = EvaluatorRequest.newBuilder().setMemory(dataMemoryMB)
+          .setNumberOfCores(dataCores).build();
+      this.dataRequests.add(defaultDataRequest);
+    } else {
+      // if there are dataRequests, make sure the user did not configure the
+      // memory or the number of cores (deprecated API), as they will be discarded
+      Validate.isTrue(this.numberOfCores == UNINITIALIZED && this.memoryMB == UNINITIALIZED,
+          "Should not set number of cores or memory if you added specific data requests");
     }
 
-    if (this.numberOfCores > 0) {
-      jcb.bindNamedParameter(DataLoadingEvaluatorNumberOfCores.class, "" + this.numberOfCores);
+    // at this point data requests cannot be empty, either we use the one we created based on the
+    // deprecated fields, or the ones created by the user
+    for (final EvaluatorRequest request : this.dataRequests) {
+      jcb.bindSetEntry(DataLoadingDataRequests.class, EvaluatorRequestSerializer.serialize(request));
     }
 
-    if (this.computeRequest != null) {
-      jcb.bindNamedParameter(DataLoadingComputeRequest.class,
-          EvaluatorRequestSerializer.serialize(this.computeRequest));
+    // compute requests can be empty to maintain compatibility with previous code.
+    if (!this.computeRequests.isEmpty()) {
+      for (final EvaluatorRequest request : this.computeRequests) {
+        jcb.bindSetEntry(DataLoadingComputeRequests.class, EvaluatorRequestSerializer.serialize(request));
+      }
     }
 
     return jcb
@@ -172,18 +272,40 @@ public final class DataLoadingRequestBuilder
   public static final class NumberOfDesiredSplits implements Name<Integer> {
   }
 
-  @NamedParameter(short_name = "dataLoadingEvaluatorMemoryMB", default_value = "4096")
+  @NamedParameter(short_name = "dataLoadingEvaluatorMemoryMB", default_value = DataLoadingEvaluatorMemoryMB.DEFAULT_DATA_MEMORY)
   public static final class DataLoadingEvaluatorMemoryMB implements Name<Integer> {
+    static final String DEFAULT_DATA_MEMORY = "4096";
   }
 
-  @NamedParameter(short_name = "dataLoadingEvaluatorCore", default_value = "1")
+  @NamedParameter(short_name = "dataLoadingEvaluatorCore", default_value = DataLoadingEvaluatorNumberOfCores.DEFAULT_DATA_CORES)
   public static final class DataLoadingEvaluatorNumberOfCores implements Name<Integer> {
+    static final String DEFAULT_DATA_CORES = "1";
   }
 
-  @NamedParameter(default_value = "NULL")
+  /**
+   * @deprecated since 0.12. Should use instead DataLoadingComputeRequests. No
+   *             need for the default value anymore, it is handled in the
+   *             DataLoader side in order to disambiguate constructors
+   */
+  @Deprecated
+  @NamedParameter
   public static final class DataLoadingComputeRequest implements Name<String> {
   }
 
+  /**
+   * Allows to specify a set of compute requests to send to the DataLoader.
+   */
+  @NamedParameter(doc = "Sets of compute requests to request to the DataLoader, i.e. evaluators requests that will not load data")
+  static final class DataLoadingComputeRequests implements Name<Set<String>> {
+  }
+
+  /**
+   * Allows to specify a set of data requests to send to the DataLoader.
+   */
+  @NamedParameter(doc = "Sets of data requests to request to the DataLoader, i.e. evaluators requests that will load data")
+  static final class DataLoadingDataRequests implements Name<Set<String>> {
+  }
+
   @NamedParameter(default_value = "false")
   public static final class LoadDataIntoMemory implements Name<Boolean> {
   }