You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/07/15 04:36:51 UTC

incubator-reef git commit: [REEF-469] Assign data splits to specific Evaluators

Repository: incubator-reef
Updated Branches:
  refs/heads/master f78c88c11 -> 33812fb01


[REEF-469] Assign data splits to specific Evaluators

This commit allows to assign data splits to the different evaluators using the
DataLoadingService.  Several changes were done. The main one is to inject a
strategy into InputFormatLoadingService, which is in charge of assigning splits
to evaluators.  New data types needed to be created. For example,
DistributedDataSetPartition, which is just a folder, but has a location field,
where the user can specify where she would like to load that data into (in
certain node or rack). We have LocationAwareJobConfs now, which binds the
typical hadoop JobConf and the DataPartition objects. Removing the
EvaluatorToPartitionMapper class, most of its logic has been placed in the
AbstractEvaluatorToSplitStrategy one.

JIRA:
  [REEF-469](https://issues.apache.org/jira/browse/REEF-469)

Pull Request:
  This closes #290


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/33812fb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/33812fb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/33812fb0

Branch: refs/heads/master
Commit: 33812fb0117ba65c92ca96b18f2eeea5d38101d2
Parents: f78c88c
Author: Ignacio Cano <na...@gmail.com>
Authored: Fri Jul 10 14:39:26 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Tue Jul 14 19:10:56 2015 -0700

----------------------------------------------------------------------
 .../loading/api/DataLoadingRequestBuilder.java  |  86 +++++--
 .../io/data/loading/api/DistributedDataSet.java | 110 ++++++++
 .../api/EvaluatorToPartitionStrategy.java       |  59 +++++
 .../AbstractEvaluatorToPartitionStrategy.java   | 254 +++++++++++++++++++
 .../impl/DistributedDataSetPartition.java       | 179 +++++++++++++
 .../DistributedDataSetPartitionSerializer.java  |  66 +++++
 .../impl/EvaluatorToPartitionMapper.java        | 153 -----------
 .../loading/impl/InputFormatLoadingService.java |  65 ++---
 ...iDataCenterEvaluatorToPartitionStrategy.java | 152 +++++++++++
 .../io/data/loading/impl/NumberedSplit.java     |  26 +-
 ...eDataCenterEvaluatorToPartitionStrategy.java |  75 ++++++
 11 files changed, 1018 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
index 53e8433..f227374 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
@@ -20,14 +20,16 @@ package org.apache.reef.io.data.loading.api;
 
 import org.apache.commons.lang.Validate;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.reef.client.DriverConfiguration;
 import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.io.data.loading.impl.DistributedDataSetPartitionSerializer;
 import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer;
-import org.apache.reef.io.data.loading.impl.InputFormatExternalConstructor;
+import org.apache.reef.io.data.loading.impl.SingleDataCenterEvaluatorToPartitionStrategy;
+import org.apache.reef.io.data.loading.impl.DistributedDataSetPartition;
 import org.apache.reef.io.data.loading.impl.InputFormatLoadingService;
 import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
+import org.apache.reef.io.data.loading.impl.MultiDataCenterEvaluatorToPartitionStrategy;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.JavaConfigurationBuilder;
 import org.apache.reef.tang.Tang;
@@ -38,6 +40,7 @@ import org.apache.reef.tang.formats.ConfigurationModule;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -69,7 +72,16 @@ public final class DataLoadingRequestBuilder
   private boolean renewFailedEvaluators = true;
   private ConfigurationModule driverConfigurationModule = null;
   private String inputFormatClass;
-  private String inputPath;
+  /**
+   * Single data center loading strategy flag. Allows to specify if the data
+   * will be loaded in machines of a single data center or not. By
+   * default, is set to true.
+   */
+  private boolean singleDataCenterStrategy = true;
+  /**
+   * Distributed dataset that can contain many distributed partitions.
+   */
+  private DistributedDataSet distributedDataSet;
 
   public DataLoadingRequestBuilder setNumberOfDesiredSplits(final int numberOfDesiredSplits) {
     this.numberOfDesiredSplits = numberOfDesiredSplits;
@@ -188,8 +200,38 @@ public final class DataLoadingRequestBuilder
     return this;
   }
 
+  /**
+   * Sets the path of the folder where the data is. Internally it constructs a
+   * distributed data set with one partition, no splits and the data can be
+   * loaded from anywhere.
+   *
+   * @deprecated since 0.12. Should use instead
+   *             {@link DataLoadingRequestBuilder#setDistributedDataSet(DistributedDataSet)}
+   * @param inputPath
+   *          the input path
+   * @return this
+   */
+  @Deprecated
   public DataLoadingRequestBuilder setInputPath(final String inputPath) {
-    this.inputPath = inputPath;
+    final DistributedDataSet dds = new DistributedDataSet();
+    dds.addPartition(DistributedDataSetPartition.newBuilder().setPath(inputPath)
+        .setLocation(DistributedDataSetPartition.LOAD_INTO_ANY_LOCATION)
+        .setDesiredSplits(Integer.valueOf(NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)).build());
+    this.singleDataCenterStrategy = true;
+    this.distributedDataSet = dds;
+    return this;
+  }
+
+  /**
+   * Sets the distributed data set.
+   *
+   * @param dataSet
+   *          the distributed data set
+   * @return this
+   */
+  public DataLoadingRequestBuilder setDistributedDataSet(final DistributedDataSet distributedDataSet) {
+    this.distributedDataSet = distributedDataSet;
+    this.singleDataCenterStrategy = false;
     return this;
   }
 
@@ -199,8 +241,8 @@ public final class DataLoadingRequestBuilder
       throw new BindException("Driver Configuration Module is a required parameter.");
     }
 
-    if (this.inputPath == null) {
-      throw new BindException("InputPath is a required parameter.");
+    if (this.distributedDataSet == null || this.distributedDataSet.isEmpty()) {
+      throw new BindException("Distributed Data Set is a required parameter.");
     }
 
     if (this.inputFormatClass == null) {
@@ -258,18 +300,32 @@ public final class DataLoadingRequestBuilder
       }
     }
 
-    return jcb
-        .bindNamedParameter(LoadDataIntoMemory.class, Boolean.toString(this.inMemory))
-        .bindConstructor(InputFormat.class, InputFormatExternalConstructor.class)
-        .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
-        .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
-        .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath)
-        .bindImplementation(DataLoadingService.class, InputFormatLoadingService.class)
-        .build();
+    jcb.bindNamedParameter(LoadDataIntoMemory.class, Boolean.toString(this.inMemory))
+       .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass);
+
+    final Iterator<DistributedDataSetPartition> partitions = this.distributedDataSet.iterator();
+    while (partitions.hasNext()) {
+      jcb.bindSetEntry(
+          DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class,
+          DistributedDataSetPartitionSerializer.serialize(partitions.next()));
+    }
+
+    // we do this check for backwards compatibility, if the user defined it
+    // wants to use the single data center loading strategy, we bind that implementation.
+    if (this.singleDataCenterStrategy) {
+      jcb.bindImplementation(EvaluatorToPartitionStrategy.class, SingleDataCenterEvaluatorToPartitionStrategy.class);
+    } else {
+      // otherwise, we bind the strategy that will allow the user to specify
+      // which evaluators can load the different partitions in a multi data center network topology
+      jcb.bindImplementation(EvaluatorToPartitionStrategy.class, MultiDataCenterEvaluatorToPartitionStrategy.class);
+    }
+
+    return jcb.bindImplementation(DataLoadingService.class, InputFormatLoadingService.class).build();
   }
 
-  @NamedParameter(short_name = "num_splits", default_value = "0")
+  @NamedParameter(short_name = "num_splits", default_value = NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)
   public static final class NumberOfDesiredSplits implements Name<Integer> {
+    static final String DEFAULT_DESIRED_SPLITS = "0";
   }
 
   @NamedParameter(short_name = "dataLoadingEvaluatorMemoryMB",

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DistributedDataSet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DistributedDataSet.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DistributedDataSet.java
new file mode 100644
index 0000000..e95ca3e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DistributedDataSet.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.io.data.loading.impl.DistributedDataSetPartition;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Represents a distributed data set that is split across data centers.
+ * It contains a set of distributed data set partitions {@link DistributedDataSetPartition}
+ * this data to be loaded into.
+ *
+ */
+@Unstable
+public final class DistributedDataSet implements Iterable<DistributedDataSetPartition> {
+
+
+  /**
+   * The set of distributed data set partitions.
+   */
+  private final Set<DistributedDataSetPartition> partitions = new HashSet<>();
+
+  /**
+   * Adds the given partition to the set.
+   *
+   * @param partition
+   *          the partition to add
+   */
+  public void addPartition(final DistributedDataSetPartition partition) {
+    this.partitions.add(partition);
+  }
+
+  /**
+   * Adds the given partitions to the set.
+   *
+   * @param partitions
+   *          the partitions to add
+   */
+  public void addPartitions(final Collection<DistributedDataSetPartition> partitions) {
+    this.partitions.addAll(partitions);
+  }
+
+  /**
+   * Returns true if it does not contain any partition.
+   *
+   * @return a boolean indicating whether it contains partitions or not
+   */
+  public boolean isEmpty() {
+    return this.partitions.isEmpty();
+  }
+
+  @Override
+  public Iterator<DistributedDataSetPartition> iterator() {
+    return new DistributedDataSetIterator(partitions);
+  }
+
+  static final class DistributedDataSetIterator implements Iterator<DistributedDataSetPartition> {
+
+    private final List<DistributedDataSetPartition> partitions;
+    private int position;
+
+    public DistributedDataSetIterator(
+        final Collection<DistributedDataSetPartition> partitions) {
+      this.partitions = new LinkedList<DistributedDataSetPartition>(partitions);
+      position = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return position < partitions.size();
+    }
+
+    @Override
+    public DistributedDataSetPartition next() {
+      final DistributedDataSetPartition partition = partitions
+          .get(position);
+      position++;
+      return partition;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException(
+          "Remove method has not been implemented in this iterator");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/EvaluatorToPartitionStrategy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/EvaluatorToPartitionStrategy.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/EvaluatorToPartitionStrategy.java
new file mode 100644
index 0000000..d10b4d4
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/EvaluatorToPartitionStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.io.data.loading.impl.NumberedSplit;
+
+/**
+ * Interface that tracks the mapping between evaluators & the data partitions
+ * assigned to those evaluators. Its part of the implementation of a
+ * {@link org.apache.reef.io.data.loading.api.DataLoadingService} that uses the
+ * Hadoop {@link org.apache.hadoop.mapred.InputFormat} to partition the data and
+ * request resources accordingly
+ *
+ * @param <V>
+ */
+@DriverSide
+@Unstable
+public interface EvaluatorToPartitionStrategy<V extends InputSplit> {
+
+  /**
+   * Returns an input split for the given evaluator.
+   * @param nodeDescriptor
+   *      the node descriptor where the evaluator is running on
+   * @param evalId
+   *      the evaluator id
+   * @return
+   *      the numberedSplit
+   * @throws RuntimeException if no split could be allocated
+   */
+  NumberedSplit<V> getInputSplit(NodeDescriptor nodeDescriptor, String evalId);
+
+  /**
+   * Returns the total number of splits computed in this strategy.
+   * @return
+   *  the number of splits
+   */
+  int getNumberOfSplits();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/AbstractEvaluatorToPartitionStrategy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/AbstractEvaluatorToPartitionStrategy.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/AbstractEvaluatorToPartitionStrategy.java
new file mode 100644
index 0000000..f6ea0dd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/AbstractEvaluatorToPartitionStrategy.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.lang.Validate;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy;
+import org.apache.reef.tang.ExternalConstructor;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is an abstract class useful for {@link EvaluatorToPartitionStrategy}
+ * implementations. Contains a template implementation of
+ * {@link EvaluatorToPartitionStrategy#getInputSplit(NodeDescriptor, String)}
+ * that call abstract methods implemented by subclasses. If your implementation
+ * does not need this logic, you should just implement the
+ * {@link EvaluatorToPartitionStrategy} interface and do not extend this class.
+ */
+@DriverSide
+@Unstable
+public abstract class AbstractEvaluatorToPartitionStrategy implements EvaluatorToPartitionStrategy<InputSplit> {
+  private static final Logger LOG = Logger.getLogger(AbstractEvaluatorToPartitionStrategy.class.getName());
+
+  protected final ConcurrentMap<String, BlockingQueue<NumberedSplit<InputSplit>>> locationToSplits;;
+  protected final ConcurrentMap<String, NumberedSplit<InputSplit>> evaluatorToSplits;
+  protected final BlockingQueue<NumberedSplit<InputSplit>> unallocatedSplits;
+
+  private int totalNumberOfSplits;
+
+  @SuppressWarnings("rawtypes")
+  AbstractEvaluatorToPartitionStrategy(
+      final String inputFormatClassName, final Set<String> serializedDataPartitions) {
+    LOG.fine("AbstractEvaluatorToPartitionStrategy injected");
+    Validate.notEmpty(inputFormatClassName);
+    Validate.notEmpty(serializedDataPartitions);
+
+    locationToSplits = new ConcurrentHashMap<>();
+    evaluatorToSplits = new ConcurrentHashMap<>();
+    unallocatedSplits = new LinkedBlockingQueue<>();
+    setUp();
+
+    final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition = new HashMap<>();
+    for (final String serializedDataPartition : serializedDataPartitions) {
+      final DistributedDataSetPartition dp = DistributedDataSetPartitionSerializer.deserialize(serializedDataPartition);
+      final ExternalConstructor<JobConf> jobConfExternalConstructor = new JobConfExternalConstructor(
+          inputFormatClassName, dp.getPath());
+      try {
+        final JobConf jobConf = jobConfExternalConstructor.newInstance();
+        final InputFormat inputFormat = jobConf.getInputFormat();
+        final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, dp.getDesiredSplits());
+        if (LOG.isLoggable(Level.FINEST)) {
+          LOG.log(Level.FINEST, "Splits for partition: {0} {1}", new Object[] {dp, Arrays.toString(inputSplits)});
+        }
+        this.totalNumberOfSplits += inputSplits.length;
+        splitsPerPartition.put(dp, inputSplits);
+      } catch (final IOException e) {
+        throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
+      }
+    }
+    init(splitsPerPartition);
+    LOG.log(Level.FINE, "Total Number of splits: {0}", this.totalNumberOfSplits);
+  }
+
+  /**
+   * Initializes the locations of the splits where we'd like to be loaded into.
+   * Sets all the splits to unallocated
+   *
+   * @param splitsPerPartition
+   *          a map containing the input splits per data partition
+   */
+  private void init(final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition) {
+    final Pair<InputSplit[], DistributedDataSetPartition[]>
+                                      splitsAndPartitions = getSplitsAndPartitions(splitsPerPartition);
+    final InputSplit[] splits = splitsAndPartitions.getFirst();
+    final DistributedDataSetPartition[] partitions = splitsAndPartitions.getSecond();
+    Validate.isTrue(splits.length == partitions.length);
+    for (int splitNum = 0; splitNum < splits.length; splitNum++) {
+      LOG.log(Level.FINE, "Processing split: " + splitNum);
+      final InputSplit split = splits[splitNum];
+      final NumberedSplit<InputSplit> numberedSplit = new NumberedSplit<InputSplit>(split, splitNum,
+          partitions[splitNum]);
+      unallocatedSplits.add(numberedSplit);
+      updateLocations(numberedSplit);
+    }
+    if (LOG.isLoggable(Level.FINE)) {
+      for (final Map.Entry<String, BlockingQueue<NumberedSplit<InputSplit>>> locSplit : locationToSplits.entrySet()) {
+        LOG.log(Level.FINE, locSplit.getKey() + ": " + locSplit.getValue().toString());
+      }
+    }
+  }
+
+  /**
+   * Each strategy should update the locations where they want the split to be
+   * loaded into. For example, the split physical location, certain node,
+   * certain rack
+   *
+   * @param numberedSplit
+   *          the numberedSplit
+   */
+  protected abstract void updateLocations(final NumberedSplit<InputSplit> numberedSplit);
+
+  /**
+   * Tries to allocate a split in an evaluator based on some particular rule.
+   * For example, based on the rack name, randomly, etc.
+   *
+   * @param nodeDescriptor
+   *          the node descriptor to extract information from
+   * @param evaluatorId
+   *          the evaluator id where we want to allocate the numberedSplit
+   * @return a numberedSplit or null if couldn't allocate one
+   */
+  protected abstract NumberedSplit<InputSplit> tryAllocate(NodeDescriptor nodeDescriptor, String evaluatorId);
+
+  /**
+   * Called in the constructor. Allows children to setUp the objects they will
+   * need in
+   * {@link AbstractEvaluatorToPartitionStrategy#updateLocations(InputSplit, NumberedSplit)}
+   * and
+   * {@link AbstractEvaluatorToPartitionStrategy#tryAllocate(NodeDescriptor, String)}
+   * methods.
+   * By default we provide an empty implementation.
+   */
+  protected void setUp() {
+    // empty implementation by default
+  }
+
+  /**
+   * Get an input split to be assigned to this evaluator.
+   * <p/>
+   * Allocates one if its not already allocated
+   *
+   * @param evaluatorId
+   * @return a numberedSplit
+   * @throws RuntimeException
+   *           if couldn't find any split
+   */
+  @Override
+  public NumberedSplit<InputSplit> getInputSplit(final NodeDescriptor nodeDescriptor, final String evaluatorId) {
+    synchronized (evaluatorToSplits) {
+      if (evaluatorToSplits.containsKey(evaluatorId)) {
+        LOG.log(Level.FINE, "Found an already allocated split, {0}", evaluatorToSplits.toString());
+        return evaluatorToSplits.get(evaluatorId);
+      }
+    }
+    // always first try to allocate based on the hostName
+    final String hostName = nodeDescriptor.getName();
+    LOG.log(Level.FINE, "Allocated split not found, trying on {0}", hostName);
+    if (locationToSplits.containsKey(hostName)) {
+      LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1}", new Object[] {evaluatorId, hostName});
+      final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId, locationToSplits.get(hostName));
+      if (split != null) {
+        return split;
+      }
+    }
+    LOG.log(Level.FINE, "{0} does not host any splits or someone else took splits hosted here. Picking other ones",
+        hostName);
+    final NumberedSplit<InputSplit> split = tryAllocate(nodeDescriptor, evaluatorId);
+    if (split == null) {
+      throw new RuntimeException("Unable to find an input split to evaluator " + evaluatorId);
+    } else {
+      LOG.log(Level.FINE, evaluatorToSplits.toString());
+    }
+    return split;
+  }
+
+  @Override
+  public int getNumberOfSplits() {
+    return this.totalNumberOfSplits;
+  }
+
+  private Pair<InputSplit[], DistributedDataSetPartition[]> getSplitsAndPartitions(
+      final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition) {
+    final List<InputSplit> inputSplits = new ArrayList<>();
+    final List<DistributedDataSetPartition> partitions = new ArrayList<>();
+    for (final Entry<DistributedDataSetPartition, InputSplit[]> entry : splitsPerPartition.entrySet()) {
+      final DistributedDataSetPartition partition = entry.getKey();
+      final InputSplit[] splits = entry.getValue();
+      for (final InputSplit split : splits) {
+        inputSplits.add(split);
+        partitions.add(partition);
+      }
+    }
+    return new Pair<>(inputSplits.toArray(new InputSplit[inputSplits.size()]),
+        partitions.toArray(new DistributedDataSetPartition[partitions.size()]));
+  }
+
+  /**
+   * Allocates the first available split into the evaluator.
+   *
+   * @param evaluatorId
+   *          the evaluator id
+   * @param value
+   *          the queue of splits
+   * @return a numberedSplit or null if it cannot find one
+   */
+  protected NumberedSplit<InputSplit> allocateSplit(final String evaluatorId,
+      final BlockingQueue<NumberedSplit<InputSplit>> value) {
+    if (value == null) {
+      LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
+      return null;
+    }
+    while (true) {
+      final NumberedSplit<InputSplit> split = value.poll();
+      if (split == null) {
+        return null;
+      }
+      if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
+        LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue");
+        final NumberedSplit<InputSplit> old = evaluatorToSplits.putIfAbsent(evaluatorId, split);
+        if (old != null) {
+          throw new RuntimeException("Trying to assign different splits to the same evaluator is not supported");
+        } else {
+          LOG.log(Level.FINE, "Returning " + split.getIndex());
+          return split;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
new file mode 100644
index 0000000..7cc52c1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * POJO that represents a distributed data set partition. Basically, it contains the path where
+ * the data files are located for this partition, and the location where we want
+ * this data to be loaded into.
+ *
+ */
+@Unstable
+public final class DistributedDataSetPartition {
+
+  /**
+   * Constant to specify that the data partition could be loaded into any
+   * location.
+   */
+  public static final String LOAD_INTO_ANY_LOCATION = "/*";
+
+  /**
+   * The path of the distributed data set partition. If we use HDFS, it will be the
+   * hdfs path.
+   */
+  private final String path;
+
+  /**
+   * The location (either a rackName or a nodeName) where we want the data in
+   * this distributed partition to be loaded into. It can contain a wildcard at
+   * the end, for example /datacenter1/*.
+   */
+  private final String location;
+
+  /**
+   * Number of desired splits for this partition.
+   */
+  private final int desiredSplits;
+
+  DistributedDataSetPartition(final String path, final String location, final int desiredSplits) {
+    this.path = path;
+    this.location = location;
+    this.desiredSplits = desiredSplits;
+  }
+
+  /**
+   * Returns the path to the distributed data partition.
+   *
+   * @return the path of the distributed data partition
+   */
+  String getPath() {
+    return path;
+  }
+
+  /**
+   * Returns the location where we want the data in this partition to be loaded
+   * into.
+   *
+   * @return the location where to load this data into.
+   */
+  String getLocation() {
+    return location;
+  }
+
+  /**
+   * Returns the number of desired splits for this data partition.
+   *
+   * @return the number of desired splits
+   */
+  int getDesiredSplits() {
+    return desiredSplits;
+  }
+
+  /**
+   * @return a new DistributedDataSetPartition Builder.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (!(obj instanceof DistributedDataSetPartition)) {
+      return false;
+    }
+    final DistributedDataSetPartition that = (DistributedDataSetPartition) obj;
+    return new EqualsBuilder().append(this.path, that.path).append(this.location, that.location)
+        .append(this.desiredSplits, that.desiredSplits).isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37).append(this.path).append(this.location).append(this.desiredSplits).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "{" + this.path + "," + this.location + "," + this.desiredSplits + "}";
+  }
+
+  /**
+   * {@link DistributedDataSetPartition}s are build using this Builder.
+   */
+  public static final class Builder implements org.apache.reef.util.Builder<DistributedDataSetPartition> {
+
+    private String path;
+    private String location;
+    private int desiredSplits;
+
+    private Builder() {
+    }
+
+    /**
+     * Sets the path of the distributed data set partition.
+     *
+     * @param path
+     *          the path to set
+     * @return this
+     */
+    public Builder setPath(final String path) {
+      this.path = path;
+      return this;
+    }
+
+
+    /**
+     * Sets the location where we want the data in this partition to be loaded
+     * into.
+     *
+     * @param location
+     *          the location to set
+     * @return this
+     */
+    public Builder setLocation(final String location) {
+      this.location = location;
+      return this;
+    }
+
+    /**
+     * Sets the desired number of splits for this partition.
+     * @param desiredSplits
+     *          the number of desired splits
+     * @return this
+     */
+    public Builder setDesiredSplits(final int desiredSplits) {
+      this.desiredSplits = desiredSplits;
+      return this;
+    }
+
+    /**
+     * Builds the {@link DistributedDataSetPartition}.
+     */
+    @Override
+    public DistributedDataSetPartition build() {
+      return new DistributedDataSetPartition(this.path, this.location, this.desiredSplits);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartitionSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartitionSerializer.java
new file mode 100644
index 0000000..60a4191
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartitionSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.io.*;
+import java.util.Set;
+
+/**
+ * Serialize and deserialize {@link DistributedDataSetPartition} objects.
+ */
+public final class DistributedDataSetPartitionSerializer {
+
+  public static String serialize(final DistributedDataSetPartition partition) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      final DataOutputStream daos = new DataOutputStream(baos);
+      daos.writeUTF(partition.getPath());
+      daos.writeUTF(partition.getLocation());
+      daos.writeInt(partition.getDesiredSplits());
+      return Base64.encodeBase64String(baos.toByteArray());
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to serialize distributed data partition", e);
+    }
+  }
+
+  public static DistributedDataSetPartition deserialize(final String serializedPartition) {
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedPartition))) {
+      final DataInputStream dais = new DataInputStream(bais);
+      return new DistributedDataSetPartition(dais.readUTF(), dais.readUTF(), dais.readInt());
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to de-serialize distributed data partition", e);
+    }
+  }
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private DistributedDataSetPartitionSerializer() {
+  }
+
+  /**
+   * Allows to specify a set of distributed data set partitions.
+   */
+  @NamedParameter(doc = "Sets of distributed data set partitions")
+  public static final class DistributedDataSetPartitions implements Name<Set<String>> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
deleted file mode 100644
index 3ac11e9..0000000
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.io.data.loading.impl;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.reef.annotations.audience.DriverSide;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Class that tracks the mapping between
- * evaluators & the data partition assigned
- * to those evaluators. Its part of the
- * implementation of a {@link org.apache.reef.io.data.loading.api.DataLoadingService}
- * that uses the Hadoop {@link org.apache.hadoop.mapred.InputFormat} to
- * partition the data and request resources
- * accordingly
- * <p/>
- * This is an online version which satisfies
- * requests in a greedy way.
- *
- * @param <V>
- */
-@DriverSide
-public class EvaluatorToPartitionMapper<V extends InputSplit> {
-  private static final Logger LOG = Logger
-      .getLogger(EvaluatorToPartitionMapper.class.getName());
-
-  private final ConcurrentMap<String, BlockingQueue<NumberedSplit<V>>> locationToSplits = new ConcurrentHashMap<>();
-  private final ConcurrentMap<String, NumberedSplit<V>> evaluatorToSplits = new ConcurrentHashMap<>();
-  private final BlockingQueue<NumberedSplit<V>> unallocatedSplits = new LinkedBlockingQueue<>();
-
-  /**
-   * Initializes the locations of splits mapping.
-   *
-   * @param splits
-   */
-  public EvaluatorToPartitionMapper(V[] splits) {
-    try {
-      for (int splitNum = 0; splitNum < splits.length; splitNum++) {
-        LOG.log(Level.FINE, "Processing split: " + splitNum);
-        final V split = splits[splitNum];
-        final String[] locations = split.getLocations();
-        final NumberedSplit<V> numberedSplit = new NumberedSplit<V>(split, splitNum);
-        unallocatedSplits.add(numberedSplit);
-        for (final String location : locations) {
-          BlockingQueue<NumberedSplit<V>> newSplitQue = new LinkedBlockingQueue<NumberedSplit<V>>();
-          final BlockingQueue<NumberedSplit<V>> splitQue = locationToSplits.putIfAbsent(location,
-              newSplitQue);
-          if (splitQue != null) {
-            newSplitQue = splitQue;
-          }
-          newSplitQue.add(numberedSplit);
-        }
-      }
-      for (Map.Entry<String, BlockingQueue<NumberedSplit<V>>> locSplit : locationToSplits.entrySet()) {
-        LOG.log(Level.FINE, locSplit.getKey() + ": " + locSplit.getValue().toString());
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "Unable to get InputSplits using the specified InputFormat", e);
-    }
-  }
-
-  /**
-   * Get an input split to be assigned to this.
-   * evaluator
-   * <p/>
-   * Allocates one if its not already allocated
-   *
-   * @param evaluatorId
-   * @return
-   */
-  public NumberedSplit<V> getInputSplit(final String hostName, final String evaluatorId) {
-    synchronized (evaluatorToSplits) {
-      if (evaluatorToSplits.containsKey(evaluatorId)) {
-        LOG.log(Level.FINE, "Found an already allocated partition");
-        LOG.log(Level.FINE, evaluatorToSplits.toString());
-        return evaluatorToSplits.get(evaluatorId);
-      }
-    }
-    LOG.log(Level.FINE, "allocated partition not found");
-    if (locationToSplits.containsKey(hostName)) {
-      LOG.log(Level.FINE, "Found partitions possibly hosted for " + evaluatorId + " at " + hostName);
-      final NumberedSplit<V> split = allocateSplit(evaluatorId, locationToSplits.get(hostName));
-      LOG.log(Level.FINE, evaluatorToSplits.toString());
-      if (split != null) {
-        return split;
-      }
-    }
-    //pick random
-    LOG.log(
-        Level.FINE,
-        hostName
-            + " does not host any partitions or someone else took partitions hosted here. Picking a random one");
-    final NumberedSplit<V> split = allocateSplit(evaluatorId, unallocatedSplits);
-    LOG.log(Level.FINE, evaluatorToSplits.toString());
-    if (split != null) {
-      return split;
-    }
-    throw new RuntimeException("Unable to find an input partition to evaluator " + evaluatorId);
-  }
-
-  private NumberedSplit<V> allocateSplit(final String evaluatorId,
-                                         final BlockingQueue<NumberedSplit<V>> value) {
-    if (value == null) {
-      LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
-      return null;
-    }
-    while (true) {
-      final NumberedSplit<V> split = value.poll();
-      if (split == null) {
-        return null;
-      }
-      if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
-        LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue");
-        final NumberedSplit<V> old = evaluatorToSplits.putIfAbsent(evaluatorId, split);
-        if (old != null) {
-          final String msg = "Trying to assign different partitions to the same evaluator " +
-              "is not supported";
-          LOG.severe(msg);
-          throw new RuntimeException(msg);
-        } else {
-          LOG.log(Level.FINE, "Returning " + split.getIndex());
-          return split;
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
index dcaafb7..40b76ce 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
@@ -29,14 +29,16 @@ import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
 import org.apache.reef.io.data.loading.api.DataLoadingService;
 import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.BindException;
 
 import javax.inject.Inject;
-import java.io.IOException;
+
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Random;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -46,10 +48,10 @@ import java.util.logging.Logger;
  * that uses the Hadoop {@link InputFormat} to find
  * partitions of data & request resources.
  * <p/>
- * The InputFormat is injected using a Tang external constructor
+ * The InputFormat is taken from the job configurations
  * <p/>
- * It also tries to obtain data locality in a greedy
- * fashion using {@link EvaluatorToPartitionMapper}
+ * The {@link EvaluatorToPartitionStrategy} is injected via Tang,
+ * in order to support different ways to map evaluators to data
  */
 @DriverSide
 public class InputFormatLoadingService<K, V> implements DataLoadingService {
@@ -61,15 +63,20 @@ public class InputFormatLoadingService<K, V> implements DataLoadingService {
   private static final String COMPUTE_CONTEXT_PREFIX =
       "ComputeContext-" + new Random(3381).nextInt(1 << 20) + "-";
 
-  private final EvaluatorToPartitionMapper<InputSplit> evaluatorToPartitionMapper;
-  private final int numberOfPartitions;
+  private final EvaluatorToPartitionStrategy<InputSplit> evaluatorToPartitionStrategy;
 
   private final boolean inMemory;
 
   private final String inputFormatClass;
 
-  private final String inputPath;
 
+  /**
+   * @deprecated since 0.12. Should use the other constructor instead, which
+   *             allows to specify the strategy on how to assign partitions to
+   *             evaluators. This one by default uses {@link SingleDataCenterEvaluatorToPartitionStrategy}
+   *
+   */
+  @Deprecated
   @Inject
   public InputFormatLoadingService(
       final InputFormat<K, V> inputFormat,
@@ -78,40 +85,36 @@ public class InputFormatLoadingService<K, V> implements DataLoadingService {
       @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) final boolean inMemory,
       @Parameter(JobConfExternalConstructor.InputFormatClass.class) final String inputFormatClass,
       @Parameter(JobConfExternalConstructor.InputPath.class) final String inputPath) {
+    this(new SingleDataCenterEvaluatorToPartitionStrategy(inputFormatClass, new HashSet<String>(
+        Arrays.asList(DistributedDataSetPartitionSerializer.serialize(new DistributedDataSetPartition(inputPath,
+            DistributedDataSetPartition.LOAD_INTO_ANY_LOCATION, numberOfDesiredSplits))))), inMemory, inputFormatClass);
+  }
 
+  @Inject
+  public InputFormatLoadingService(
+      final EvaluatorToPartitionStrategy<InputSplit> evaluatorToPartitionStrategy,
+      @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) final boolean inMemory,
+      @Parameter(JobConfExternalConstructor.InputFormatClass.class) final String inputFormatClass) {
     this.inMemory = inMemory;
     this.inputFormatClass = inputFormatClass;
-    this.inputPath = inputPath;
-
-
-    try {
-
-      final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, numberOfDesiredSplits);
-      if (LOG.isLoggable(Level.FINEST)) {
-        LOG.log(Level.FINEST, "Splits: {0}", Arrays.toString(inputSplits));
-      }
-
-      this.numberOfPartitions = inputSplits.length;
-      LOG.log(Level.FINE, "Number of partitions: {0}", this.numberOfPartitions);
-
-      this.evaluatorToPartitionMapper = new EvaluatorToPartitionMapper<>(inputSplits);
-
-    } catch (final IOException e) {
-      throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
-    }
+    this.evaluatorToPartitionStrategy = evaluatorToPartitionStrategy;
   }
 
+  /**
+   * This method actually returns the number of splits in all partition of the data.
+   * We should probably need to rename it in the future
+   */
   @Override
   public int getNumberOfPartitions() {
-    return this.numberOfPartitions;
+    return evaluatorToPartitionStrategy.getNumberOfSplits();
   }
 
   @Override
   public Configuration getContextConfiguration(final AllocatedEvaluator allocatedEvaluator) {
 
     final NumberedSplit<InputSplit> numberedSplit =
-        this.evaluatorToPartitionMapper.getInputSplit(
-            allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
+        this.evaluatorToPartitionStrategy.getInputSplit(
+            allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor(),
             allocatedEvaluator.getId());
 
     return ContextConfiguration.CONF
@@ -125,8 +128,8 @@ public class InputFormatLoadingService<K, V> implements DataLoadingService {
     try {
 
       final NumberedSplit<InputSplit> numberedSplit =
-          this.evaluatorToPartitionMapper.getInputSplit(
-              allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
+          this.evaluatorToPartitionStrategy.getInputSplit(
+              allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor(),
               allocatedEvaluator.getId());
 
       final Configuration serviceConfiguration = ServiceConfiguration.CONF
@@ -139,7 +142,7 @@ public class InputFormatLoadingService<K, V> implements DataLoadingService {
               DataSet.class,
               this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
           .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
-          .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath)
+          .bindNamedParameter(JobConfExternalConstructor.InputPath.class, numberedSplit.getPath())
           .bindNamedParameter(
               InputSplitExternalConstructor.SerializedInputSplit.class,
               WritableSerializer.serialize(numberedSplit.getEntry()))

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
new file mode 100644
index 0000000..31bda53
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.tang.annotations.Parameter;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+/**
+ * This is an online version which satisfies requests based on the locations the
+ * users ask the data to be loaded, for multiple data center network topologies.
+ *
+ */
+@DriverSide
+@Unstable
+public final class MultiDataCenterEvaluatorToPartitionStrategy extends AbstractEvaluatorToPartitionStrategy {
+  private static final Logger LOG = Logger.getLogger(MultiDataCenterEvaluatorToPartitionStrategy.class.getName());
+
+  private static final String PATH_SEPARATOR = "/";
+  private static final String ANY = "*";
+  /**
+   * Sorted set in reverse order, to keep track of the locations from most to
+   * least specific. For example: [/dc1/room1, /dc1].
+   */
+  private Set<String> normalizedLocations;
+  /**
+   * Partial locations where we want to allocate, in case exact match does not work.
+   */
+  private ConcurrentMap<String, BlockingQueue<NumberedSplit<InputSplit>>> partialLocationsToSplits;
+
+
+  @Inject
+  MultiDataCenterEvaluatorToPartitionStrategy(
+      @Parameter(JobConfExternalConstructor.InputFormatClass.class) final String inputFormatClassName,
+      @Parameter(DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class)
+      final Set<String> serializedDataPartitions) {
+    super(inputFormatClassName, serializedDataPartitions);
+  }
+
+  /**
+   * Creates the objects to be used in updateLocations and tryAllocate methods.
+   */
+  @Override
+  protected void setUp() {
+    normalizedLocations = new TreeSet<>(Collections.reverseOrder());
+    partialLocationsToSplits = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * {@inheritDoc}.
+   * Saves locationToSplits and partialLocations as well.
+   */
+  @Override
+  protected void updateLocations(final NumberedSplit<InputSplit> numberedSplit) {
+    final String location = numberedSplit.getLocation();
+    addLocationMapping(locationToSplits, numberedSplit, location);
+    final String normalizedLocation = normalize(location);
+    addLocationMapping(partialLocationsToSplits, numberedSplit, normalizedLocation);
+    normalizedLocations.add(normalizedLocation);
+  }
+
+  /**
+   * {@inheritDoc}. Tries to allocate on exact rack match, if it cannot, then it
+   * tries to get a partial match using the partialLocations map.
+   */
+  @Override
+  protected NumberedSplit<InputSplit> tryAllocate(final NodeDescriptor nodeDescriptor, final String evaluatorId) {
+    final String rackName = nodeDescriptor.getRackDescriptor().getName();
+    LOG.log(Level.FINE, "Trying an exact match on rack name {0}", rackName);
+    if (locationToSplits.containsKey(rackName)) {
+      LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1}", new Object[] {evaluatorId, rackName});
+      final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId, locationToSplits.get(rackName));
+      if (split != null) {
+        return split;
+      }
+    }
+    LOG.fine("No success, trying based on a partial match on locations");
+    final Iterator<String> it = normalizedLocations.iterator();
+    while (it.hasNext()) {
+      final String possibleLocation = it.next();
+      LOG.log(Level.FINE, "Trying on possible location {0}", possibleLocation);
+      if (rackName.startsWith(possibleLocation)) {
+        LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1} for rack {2}", new Object[] {evaluatorId,
+            possibleLocation, rackName});
+        final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId,
+            partialLocationsToSplits.get(possibleLocation));
+        if (split != null) {
+          return split;
+        }
+      }
+    }
+    LOG.fine("Nothing found");
+    return null;
+  }
+
+  private void addLocationMapping(final ConcurrentMap<String,
+      BlockingQueue<NumberedSplit<InputSplit>>> concurrentMap,
+      final NumberedSplit<InputSplit> numberedSplit, final String location) {
+    if (!concurrentMap.containsKey(location)) {
+      final BlockingQueue<NumberedSplit<InputSplit>> newSplitQueue = new LinkedBlockingQueue<>();
+      concurrentMap.put(location, newSplitQueue);
+    }
+    concurrentMap.get(location).add(numberedSplit);
+  }
+
+  private String normalize(String location) {
+    // should start with a separator
+    if (!location.startsWith(PATH_SEPARATOR)) {
+      location = PATH_SEPARATOR + location;
+    }
+    // if it is just /*, return /
+    if (location.equals(PATH_SEPARATOR + ANY)) {
+      return PATH_SEPARATOR;
+    }
+    // remove the ending ANY or path separator
+    while (location.endsWith(ANY) || location.endsWith(PATH_SEPARATOR)) {
+      location = location.substring(0, location.length() - 1);
+    }
+    return location;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
index f3cbc97..a9594aa 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
@@ -18,24 +18,34 @@
  */
 package org.apache.reef.io.data.loading.impl;
 
+import org.apache.commons.lang.Validate;
+
 /**
  * A tuple of an object of type E and an integer index.
- * Used inside {@link EvaluatorToPartitionMapper} to
+ * Used inside {@link EvaluatorToPartitionStrategy} implementations to
  * mark the partitions associated with each {@link org.apache.hadoop.mapred.InputSplit}
  *
  * @param <E>
  */
-final class NumberedSplit<E> implements Comparable<NumberedSplit<E>> {
+public final class NumberedSplit<E> implements Comparable<NumberedSplit<E>> {
   private final E entry;
   private final int index;
+  private final DistributedDataSetPartition partition;
 
-  public NumberedSplit(final E entry, final int index) {
-    super();
-    if (entry == null) {
-      throw new IllegalArgumentException("Entry cannot be null");
-    }
+  public NumberedSplit(final E entry, final int index, final DistributedDataSetPartition partition) {
+    Validate.notNull(entry, "Entry cannot be null");
+    Validate.notNull(partition, "Partition cannot be null");
     this.entry = entry;
     this.index = index;
+    this.partition = partition;
+  }
+
+  public String getPath() {
+    return partition.getPath();
+  }
+
+  public String getLocation() {
+    return partition.getLocation();
   }
 
   public E getEntry() {
@@ -48,7 +58,7 @@ final class NumberedSplit<E> implements Comparable<NumberedSplit<E>> {
 
   @Override
   public String toString() {
-    return "InputSplit-" + index;
+    return "InputSplit-" + partition + "-" + index;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/SingleDataCenterEvaluatorToPartitionStrategy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/SingleDataCenterEvaluatorToPartitionStrategy.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/SingleDataCenterEvaluatorToPartitionStrategy.java
new file mode 100644
index 0000000..b0d0f6f
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/SingleDataCenterEvaluatorToPartitionStrategy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.tang.annotations.Parameter;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+/**
+ * This is an online version which satisfies
+ * requests in a greedy way, for single data center network topologies.
+ */
+@DriverSide
+public final class SingleDataCenterEvaluatorToPartitionStrategy extends AbstractEvaluatorToPartitionStrategy {
+  private static final Logger LOG = Logger
+      .getLogger(SingleDataCenterEvaluatorToPartitionStrategy.class.getName());
+
+  @Inject
+  SingleDataCenterEvaluatorToPartitionStrategy(
+      @Parameter(JobConfExternalConstructor.InputFormatClass.class) final String inputFormatClassName,
+      @Parameter(DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class)
+      final Set<String> serializedDataPartitions) {
+    super(inputFormatClassName, serializedDataPartitions);
+  }
+
+  @Override
+  protected void updateLocations(final NumberedSplit<InputSplit> numberedSplit) {
+    try {
+      final InputSplit split = numberedSplit.getEntry();
+      final String[] locations = split.getLocations();
+      for (final String location : locations) {
+        BlockingQueue<NumberedSplit<InputSplit>> newSplitQue = new LinkedBlockingQueue<NumberedSplit<InputSplit>>();
+        final BlockingQueue<NumberedSplit<InputSplit>> splitQue = locationToSplits.putIfAbsent(location, newSplitQue);
+        if (splitQue != null) {
+          newSplitQue = splitQue;
+        }
+        newSplitQue.add(numberedSplit);
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
+    }
+  }
+
+  @Override
+  protected NumberedSplit<InputSplit> tryAllocate(final NodeDescriptor nodeDescriptor, final String evaluatorId) {
+    LOG.fine("Picking a random split from the unallocated ones");
+    return allocateSplit(evaluatorId, unallocatedSplits);
+  }
+
+}