You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:01 UTC

[28/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingDriverConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingDriverConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingDriverConfiguration.java
new file mode 100644
index 0000000..6d5515e
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingDriverConfiguration.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.driver.parameters.EvaluatorAllocatedHandlers;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.wake.time.Clock;
+
+public final class DataLoadingDriverConfiguration extends ConfigurationModuleBuilder {
+  public static final ConfigurationModule CONF = new DataLoadingDriverConfiguration()
+      .merge(DriverConfiguration.CONF)
+      .bindSetEntry(Clock.StartHandler.class, DataLoader.StartHandler.class)
+      .bindSetEntry(EvaluatorAllocatedHandlers.class, DataLoader.EvaluatorAllocatedHandler.class)
+      .build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
new file mode 100644
index 0000000..be479c6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer;
+import org.apache.reef.io.data.loading.impl.InputFormatExternalConstructor;
+import org.apache.reef.io.data.loading.impl.InputFormatLoadingService;
+import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.JavaConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationModule;
+
+/**
+ * Builder to create a request to the DataLoadingService.
+ */
+public final class DataLoadingRequestBuilder
+    implements org.apache.reef.util.Builder<Configuration> {
+
+  private int memoryMB = -1;
+  private int numberOfCores = -1;
+  private int numberOfDesiredSplits = -1;
+  private EvaluatorRequest computeRequest = null;
+  private boolean inMemory = false;
+  private boolean renewFailedEvaluators = true;
+  private ConfigurationModule driverConfigurationModule = null;
+  private String inputFormatClass;
+  private String inputPath;
+
+  public DataLoadingRequestBuilder setNumberOfDesiredSplits(final int numberOfDesiredSplits) {
+    this.numberOfDesiredSplits = numberOfDesiredSplits;
+    return this;
+  }
+
+  /**
+   * Set the memory to be used for Evaluator allocated.
+   *
+   * @param memoryMB the amount of memory in MB
+   * @return this
+   */
+  public DataLoadingRequestBuilder setMemoryMB(final int memoryMB) {
+    this.memoryMB = memoryMB;
+    return this;
+  }
+
+  /**
+   * Set the core number to be used for Evaluator allocated.
+   *
+   * @param numberOfCores the number of cores
+   * @return this
+   */
+  public DataLoadingRequestBuilder setNumberOfCores(final int numberOfCores) {
+    this.numberOfCores = numberOfCores;
+    return this;
+  }
+
+  public DataLoadingRequestBuilder setComputeRequest(final EvaluatorRequest computeRequest) {
+    this.computeRequest = computeRequest;
+    return this;
+  }
+
+  public DataLoadingRequestBuilder loadIntoMemory(final boolean inMemory) {
+    this.inMemory = inMemory;
+    return this;
+  }
+
+  public DataLoadingRequestBuilder renewFailedEvaluators(final boolean renewFailedEvaluators) {
+    this.renewFailedEvaluators = renewFailedEvaluators;
+    return this;
+  }
+
+  public DataLoadingRequestBuilder setDriverConfigurationModule(
+      final ConfigurationModule driverConfigurationModule) {
+    this.driverConfigurationModule = driverConfigurationModule;
+    return this;
+  }
+
+  public DataLoadingRequestBuilder setInputFormatClass(
+      final Class<? extends InputFormat> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass.getName();
+    return this;
+  }
+
+  public DataLoadingRequestBuilder setInputPath(final String inputPath) {
+    this.inputPath = inputPath;
+    return this;
+  }
+
+  @Override
+  public Configuration build() throws BindException {
+    if (this.driverConfigurationModule == null) {
+      throw new BindException("Driver Configuration Module is a required parameter.");
+    }
+
+    if (this.inputPath == null) {
+      throw new BindException("InputPath is a required parameter.");
+    }
+
+    if (this.inputFormatClass == null) {
+      this.inputFormatClass = TextInputFormat.class.getName();
+    }
+
+    final Configuration driverConfiguration;
+    if (renewFailedEvaluators) {
+      driverConfiguration = this.driverConfigurationModule
+          .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
+          .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
+          .set(DriverConfiguration.ON_EVALUATOR_FAILED, DataLoader.EvaluatorFailedHandler.class)
+          .build();
+    } else {
+      driverConfiguration = this.driverConfigurationModule
+          .set(DriverConfiguration.ON_DRIVER_STARTED, DataLoader.StartHandler.class)
+          .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DataLoader.EvaluatorAllocatedHandler.class)
+          .build();
+    }
+
+    final JavaConfigurationBuilder jcb =
+        Tang.Factory.getTang().newConfigurationBuilder(driverConfiguration);
+
+    if (this.numberOfDesiredSplits > 0) {
+      jcb.bindNamedParameter(NumberOfDesiredSplits.class, "" + this.numberOfDesiredSplits);
+    }
+
+    if (this.memoryMB > 0) {
+      jcb.bindNamedParameter(DataLoadingEvaluatorMemoryMB.class, "" + this.memoryMB);
+    }
+
+    if (this.numberOfCores > 0) {
+      jcb.bindNamedParameter(DataLoadingEvaluatorNumberOfCores.class, "" + this.numberOfCores);
+    }
+
+    if (this.computeRequest != null) {
+      jcb.bindNamedParameter(DataLoadingComputeRequest.class,
+          EvaluatorRequestSerializer.serialize(this.computeRequest));
+    }
+
+    return jcb
+        .bindNamedParameter(LoadDataIntoMemory.class, Boolean.toString(this.inMemory))
+        .bindConstructor(InputFormat.class, InputFormatExternalConstructor.class)
+        .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
+        .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
+        .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath)
+        .bindImplementation(DataLoadingService.class, InputFormatLoadingService.class)
+        .build();
+  }
+
+  @NamedParameter(short_name = "num_splits", default_value = "0")
+  public static final class NumberOfDesiredSplits implements Name<Integer> {
+  }
+
+  @NamedParameter(short_name = "dataLoadingEvaluatorMemoryMB", default_value = "4096")
+  public static final class DataLoadingEvaluatorMemoryMB implements Name<Integer> {
+  }
+
+  @NamedParameter(short_name = "dataLoadingEvaluatorCore", default_value = "1")
+  public static final class DataLoadingEvaluatorNumberOfCores implements Name<Integer> {
+  }
+
+  @NamedParameter(default_value = "NULL")
+  public static final class DataLoadingComputeRequest implements Name<String> {
+  }
+
+  @NamedParameter(default_value = "false")
+  public static final class LoadDataIntoMemory implements Name<Boolean> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingService.java
new file mode 100644
index 0000000..9461148
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingService.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.tang.Configuration;
+
+/**
+ * All data loading services should implement this interface.
+ */
+@DriverSide
+public interface DataLoadingService {
+
+  /**
+   * Access to the number of partitions suggested by this DataSource.
+   *
+   * @return the number of partitions suggested by this DataSource.
+   */
+  int getNumberOfPartitions();
+
+  /**
+   * @return the context configuration for the given Evaluator.
+   */
+  Configuration getContextConfiguration(final AllocatedEvaluator allocatedEvaluator);
+
+  /**
+   * @return the service configuration for the given Evaluator.
+   */
+  Configuration getServiceConfiguration(final AllocatedEvaluator allocatedEvaluator);
+
+  /**
+   * @return Return the prefix to be used to enumerate
+   * context ids for compute requests fired other than
+   * the data load contexts.
+   */
+  String getComputeContextIdPrefix();
+
+  /**
+   * Distinguishes data loaded contexts from compute contexts.
+   *
+   * @return true if this context has been loaded with data.
+   */
+  boolean isDataLoadedContext(ActiveContext context);
+
+  /**
+   * @return true if this is a computation context,
+   * false otherwise. (e.g. this is a data loading context).
+   */
+  boolean isComputeContext(ActiveContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataSet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataSet.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataSet.java
new file mode 100644
index 0000000..a15764d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataSet.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.network.util.Pair;
+
+/**
+ * A view of the data set to be loaded
+ * at an evaluator as an iterable of
+ * key value pairs.
+ * <p/>
+ * Implementations need not materialize
+ * and clients should not assume that the
+ * data is materialized. Any such thing
+ * is left as a post-processing step.
+ * <p/>
+ * Client also can't assume that the iterator
+ * returned here can be restarted
+ *
+ * @param <K>
+ * @param <V>
+ */
+@TaskSide
+public interface DataSet<K, V> extends Iterable<Pair<K, V>> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/ResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/ResourceRequestHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/ResourceRequestHandler.java
new file mode 100644
index 0000000..ddf3b15
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/ResourceRequestHandler.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class ResourceRequestHandler implements EventHandler<EvaluatorRequest> {
+
+  private static final Logger LOG = Logger.getLogger(ResourceRequestHandler.class.getName());
+
+  private final EvaluatorRequestor requestor;
+
+  private CountDownLatch resourceRequestGate = new CountDownLatch(1);
+
+  public ResourceRequestHandler(final EvaluatorRequestor requestor) {
+    this.requestor = requestor;
+  }
+
+  public void releaseResourceRequestGate() {
+    LOG.log(Level.FINE, "Releasing Gate");
+    this.resourceRequestGate.countDown();
+  }
+
+  @Override
+  public void onNext(final EvaluatorRequest request) {
+    try {
+
+      LOG.log(Level.FINE,
+          "Processing a request with count: {0} - Waiting for gate to be released",
+          request.getNumber());
+
+      this.resourceRequestGate.await();
+
+      LOG.log(Level.FINE, "Gate released. Submitting request: {0}", request);
+      this.resourceRequestGate = new CountDownLatch(1);
+      this.requestor.submit(request);
+
+    } catch (final InterruptedException ex) {
+      LOG.log(Level.FINEST, "Interrupted", ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorRequestSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorRequestSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorRequestSerializer.java
new file mode 100644
index 0000000..2f493c6
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorRequestSerializer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+
+import java.io.*;
+
+/**
+ * Serialize and deserialize EvaluatorRequest objects
+ * Currently only supports number & memory
+ * Does not take care of Resource Descriptor
+ */
+public class EvaluatorRequestSerializer {
+  public static String serialize(EvaluatorRequest request) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      try (DataOutputStream daos = new DataOutputStream(baos)) {
+
+        daos.writeInt(request.getNumber());
+        daos.writeInt(request.getMegaBytes());
+        daos.writeInt(request.getNumberOfCores());
+
+      } catch (IOException e) {
+        throw e;
+      }
+
+      return Base64.encodeBase64String(baos.toByteArray());
+    } catch (IOException e1) {
+      throw new RuntimeException("Unable to serialize compute request", e1);
+    }
+  }
+
+  public static EvaluatorRequest deserialize(String serializedRequest) {
+    try (ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRequest))) {
+      try (DataInputStream dais = new DataInputStream(bais)) {
+        return EvaluatorRequest.newBuilder()
+            .setNumber(dais.readInt())
+            .setMemory(dais.readInt())
+            .setNumberOfCores(dais.readInt())
+            .build();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to de-serialize compute request", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
new file mode 100644
index 0000000..5a5c0ff
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.io.data.loading.api.DataLoadingService;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Class that tracks the mapping between
+ * evaluators & the data partition assigned
+ * to those evaluators. Its part of the
+ * implementation of a {@link DataLoadingService}
+ * that uses the Hadoop {@link InputFormat} to
+ * partition the data and request resources
+ * accordingly
+ * <p/>
+ * This is an online version which satisfies
+ * requests in a greedy way.
+ *
+ * @param <V>
+ */
+@DriverSide
+public class EvaluatorToPartitionMapper<V extends InputSplit> {
+  private static final Logger LOG = Logger
+      .getLogger(EvaluatorToPartitionMapper.class.getName());
+
+  private final ConcurrentMap<String, BlockingQueue<NumberedSplit<V>>> locationToSplits = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, NumberedSplit<V>> evaluatorToSplits = new ConcurrentHashMap<>();
+  private final BlockingQueue<NumberedSplit<V>> unallocatedSplits = new LinkedBlockingQueue<>();
+
+  /**
+   * Initializes the locations of splits mapping
+   *
+   * @param splits
+   */
+  public EvaluatorToPartitionMapper(V[] splits) {
+    try {
+      for (int splitNum = 0; splitNum < splits.length; splitNum++) {
+        LOG.log(Level.FINE, "Processing split: " + splitNum);
+        final V split = splits[splitNum];
+        final String[] locations = split.getLocations();
+        final NumberedSplit<V> numberedSplit = new NumberedSplit<V>(split, splitNum);
+        unallocatedSplits.add(numberedSplit);
+        for (final String location : locations) {
+          BlockingQueue<NumberedSplit<V>> newSplitQue = new LinkedBlockingQueue<NumberedSplit<V>>();
+          final BlockingQueue<NumberedSplit<V>> splitQue = locationToSplits.putIfAbsent(location,
+              newSplitQue);
+          if (splitQue != null) {
+            newSplitQue = splitQue;
+          }
+          newSplitQue.add(numberedSplit);
+        }
+      }
+      for (Map.Entry<String, BlockingQueue<NumberedSplit<V>>> locSplit : locationToSplits.entrySet()) {
+        LOG.log(Level.FINE, locSplit.getKey() + ": " + locSplit.getValue().toString());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "Unable to get InputSplits using the specified InputFormat", e);
+    }
+  }
+
+  /**
+   * Get an input split to be assigned to this
+   * evaluator
+   * <p/>
+   * Allocates one if its not already allocated
+   *
+   * @param evaluatorId
+   * @return
+   */
+  public NumberedSplit<V> getInputSplit(final String hostName, final String evaluatorId) {
+    synchronized (evaluatorToSplits) {
+      if (evaluatorToSplits.containsKey(evaluatorId)) {
+        LOG.log(Level.FINE, "Found an already allocated partition");
+        LOG.log(Level.FINE, evaluatorToSplits.toString());
+        return evaluatorToSplits.get(evaluatorId);
+      }
+    }
+    LOG.log(Level.FINE, "allocated partition not found");
+    if (locationToSplits.containsKey(hostName)) {
+      LOG.log(Level.FINE, "Found partitions possibly hosted for " + evaluatorId + " at " + hostName);
+      final NumberedSplit<V> split = allocateSplit(evaluatorId, locationToSplits.get(hostName));
+      LOG.log(Level.FINE, evaluatorToSplits.toString());
+      if (split != null) {
+        return split;
+      }
+    }
+    //pick random
+    LOG.log(
+        Level.FINE,
+        hostName
+            + " does not host any partitions or someone else took partitions hosted here. Picking a random one");
+    final NumberedSplit<V> split = allocateSplit(evaluatorId, unallocatedSplits);
+    LOG.log(Level.FINE, evaluatorToSplits.toString());
+    if (split != null) {
+      return split;
+    }
+    throw new RuntimeException("Unable to find an input partition to evaluator " + evaluatorId);
+  }
+
+  private NumberedSplit<V> allocateSplit(final String evaluatorId,
+                                         final BlockingQueue<NumberedSplit<V>> value) {
+    if (value == null) {
+      LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
+      return null;
+    }
+    while (true) {
+      final NumberedSplit<V> split = value.poll();
+      if (split == null)
+        return null;
+      if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
+        LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue");
+        final NumberedSplit<V> old = evaluatorToSplits.putIfAbsent(evaluatorId, split);
+        if (old != null) {
+          final String msg = "Trying to assign different partitions to the same evaluator " +
+              "is not supported";
+          LOG.severe(msg);
+          throw new RuntimeException(msg);
+        } else {
+          LOG.log(Level.FINE, "Returning " + split.getIndex());
+          return split;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InMemoryInputFormatDataSet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InMemoryInputFormatDataSet.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InMemoryInputFormatDataSet.java
new file mode 100644
index 0000000..e729aaf
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InMemoryInputFormatDataSet.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.io.network.util.Pair;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class InMemoryInputFormatDataSet<K extends WritableComparable<K>, V extends Writable>
+    implements DataSet<K, V> {
+
+  private final InputFormatDataSet<K, V> inputFormatDataSet;
+  private List<Pair<K, V>> recordsList = null;
+
+  @Inject
+  public InMemoryInputFormatDataSet(InputFormatDataSet<K, V> inputFormatDataSet) {
+    this.inputFormatDataSet = inputFormatDataSet;
+  }
+
+
+  @Override
+  public synchronized Iterator<Pair<K, V>> iterator() {
+    if (recordsList == null) {
+      recordsList = new ArrayList<>();
+      for (final Pair<K, V> keyValue : inputFormatDataSet) {
+        recordsList.add(keyValue);
+      }
+    }
+    return recordsList.iterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatDataSet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatDataSet.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatDataSet.java
new file mode 100644
index 0000000..a7632f3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatDataSet.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.*;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.io.network.util.Pair;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * An implementation of {@link DataSet} that reads records using a RecordReader
+ * encoded inside an InputSplit.
+ * <p/>
+ * The input split is injected through an external constructor by deserializing
+ * the input split assigned to this evaluator.
+ *
+ * @param <K>
+ * @param <V>
+ */
+@TaskSide
+public final class
+    InputFormatDataSet<K extends WritableComparable<K>, V extends Writable>
+    implements DataSet<K, V> {
+
+  private final DummyReporter dummyReporter = new DummyReporter();
+  private final JobConf jobConf;
+  private final InputFormat<K, V> inputFormat;
+  private final InputSplit split;
+  private RecordReader lastRecordReader = null;
+
+  @Inject
+  public InputFormatDataSet(final InputSplit split, final JobConf jobConf) {
+    this.jobConf = jobConf;
+    this.inputFormat = this.jobConf.getInputFormat();
+    this.split = split;
+  }
+
+  @Override
+  public Iterator<Pair<K, V>> iterator() {
+    try {
+
+      final RecordReader newRecordReader =
+          this.inputFormat.getRecordReader(this.split, this.jobConf, this.dummyReporter);
+
+      if (newRecordReader == this.lastRecordReader) {
+        throw new RuntimeException("Received the same record reader again. This isn't supported.");
+      }
+
+      this.lastRecordReader = newRecordReader;
+      return new RecordReaderIterator(newRecordReader);
+
+    } catch (final IOException ex) {
+      throw new RuntimeException("Can't instantiate iterator.", ex);
+    }
+  }
+
+  private final class RecordReaderIterator implements Iterator<Pair<K, V>> {
+
+    private final RecordReader<K, V> recordReader;
+    private Pair<K, V> recordPair;
+    private boolean hasNext;
+
+    RecordReaderIterator(final RecordReader<K, V> recordReader) {
+      this.recordReader = recordReader;
+      fetchRecord();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return this.hasNext;
+    }
+
+    @Override
+    public Pair<K, V> next() {
+      final Pair<K, V> prevRecordPair = this.recordPair;
+      fetchRecord();
+      return prevRecordPair;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove is not supported on RecordReader iterator");
+    }
+
+    private void fetchRecord() {
+      this.recordPair = new Pair<>(this.recordReader.createKey(), this.recordReader.createValue());
+      try {
+        this.hasNext = this.recordReader.next(this.recordPair.first, this.recordPair.second);
+      } catch (final IOException ex) {
+        throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", ex);
+      }
+    }
+  }
+
+  private final class DummyReporter implements Reporter {
+
+    @Override
+    public void progress() {
+    }
+
+    @Override
+    public Counter getCounter(final Enum<?> key) {
+      return null;
+    }
+
+    @Override
+    public Counter getCounter(final String group, final String name) {
+      return null;
+    }
+
+    @Override
+    public InputSplit getInputSplit() throws UnsupportedOperationException {
+      throw new UnsupportedOperationException("This is a Fake Reporter");
+    }
+
+    @Override
+    public float getProgress() {
+      return 0;
+    }
+
+    @Override
+    public void incrCounter(final Enum<?> key, final long amount) {
+    }
+
+    @Override
+    public void incrCounter(final String group, final String counter, final long amount) {
+    }
+
+    @Override
+    public void setStatus(final String status) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatExternalConstructor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatExternalConstructor.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatExternalConstructor.java
new file mode 100644
index 0000000..aef6226
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatExternalConstructor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.tang.ExternalConstructor;
+
+import javax.inject.Inject;
+
+
+/**
+ * A Tang External Constructor to inject the required
+ * InputFormat
+ */
+@DriverSide
+public class InputFormatExternalConstructor implements ExternalConstructor<InputFormat<?, ?>> {
+
+  private final JobConf jobConf;
+  private final InputFormat<?, ?> inputFormat;
+
+  @Inject
+  public InputFormatExternalConstructor(final JobConf jobConf) {
+    this.jobConf = jobConf;
+    inputFormat = jobConf.getInputFormat();
+  }
+
+  @Override
+  public InputFormat<?, ?> newInstance() {
+    return inputFormat;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
new file mode 100644
index 0000000..a2c9cb1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.ServiceConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
+import org.apache.reef.io.data.loading.api.DataLoadingService;
+import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An implementation of {@link DataLoadingService}
+ * that uses the Hadoop {@link InputFormat} to find
+ * partitions of data & request resources.
+ * <p/>
+ * The InputFormat is injected using a Tang external constructor
+ * <p/>
+ * It also tries to obtain data locality in a greedy
+ * fashion using {@link EvaluatorToPartitionMapper}
+ */
+@DriverSide
+public class InputFormatLoadingService<K, V> implements DataLoadingService {
+
+  private static final Logger LOG = Logger.getLogger(InputFormatLoadingService.class.getName());
+
+  private static final String DATA_LOAD_CONTEXT_PREFIX = "DataLoadContext-";
+
+  private static final String COMPUTE_CONTEXT_PREFIX =
+      "ComputeContext-" + new Random(3381).nextInt(1 << 20) + "-";
+
+  private final EvaluatorToPartitionMapper<InputSplit> evaluatorToPartitionMapper;
+  private final int numberOfPartitions;
+
+  private final boolean inMemory;
+
+  private final String inputFormatClass;
+
+  private final String inputPath;
+
+  @Inject
+  public InputFormatLoadingService(
+      final InputFormat<K, V> inputFormat,
+      final JobConf jobConf,
+      final @Parameter(DataLoadingRequestBuilder.NumberOfDesiredSplits.class) int numberOfDesiredSplits,
+      final @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) boolean inMemory,
+      final @Parameter(JobConfExternalConstructor.InputFormatClass.class) String inputFormatClass,
+      final @Parameter(JobConfExternalConstructor.InputPath.class) String inputPath) {
+
+    this.inMemory = inMemory;
+    this.inputFormatClass = inputFormatClass;
+    this.inputPath = inputPath;
+
+
+    try {
+
+      final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, numberOfDesiredSplits);
+      if (LOG.isLoggable(Level.FINEST)) {
+        LOG.log(Level.FINEST, "Splits: {0}", Arrays.toString(inputSplits));
+      }
+
+      this.numberOfPartitions = inputSplits.length;
+      LOG.log(Level.FINE, "Number of partitions: {0}", this.numberOfPartitions);
+
+      this.evaluatorToPartitionMapper = new EvaluatorToPartitionMapper<>(inputSplits);
+
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
+    }
+  }
+
+  @Override
+  public int getNumberOfPartitions() {
+    return this.numberOfPartitions;
+  }
+
+  @Override
+  public Configuration getContextConfiguration(final AllocatedEvaluator allocatedEvaluator) {
+
+    final NumberedSplit<InputSplit> numberedSplit =
+        this.evaluatorToPartitionMapper.getInputSplit(
+            allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
+            allocatedEvaluator.getId());
+
+    return ContextConfiguration.CONF
+        .set(ContextConfiguration.IDENTIFIER, DATA_LOAD_CONTEXT_PREFIX + numberedSplit.getIndex())
+        .build();
+  }
+
+  @Override
+  public Configuration getServiceConfiguration(final AllocatedEvaluator allocatedEvaluator) {
+
+    try {
+
+      final NumberedSplit<InputSplit> numberedSplit =
+          this.evaluatorToPartitionMapper.getInputSplit(
+              allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
+              allocatedEvaluator.getId());
+
+      final Configuration serviceConfiguration = ServiceConfiguration.CONF
+          .set(ServiceConfiguration.SERVICES,
+              this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
+          .build();
+
+      return Tang.Factory.getTang().newConfigurationBuilder(serviceConfiguration)
+          .bindImplementation(
+              DataSet.class,
+              this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)
+          .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, inputFormatClass)
+          .bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputPath)
+          .bindNamedParameter(
+              InputSplitExternalConstructor.SerializedInputSplit.class,
+              WritableSerializer.serialize(numberedSplit.getEntry()))
+          .bindConstructor(InputSplit.class, InputSplitExternalConstructor.class)
+          .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
+          .build();
+
+    } catch (final BindException ex) {
+      final String evalId = allocatedEvaluator.getId();
+      final String msg = "Unable to create configuration for evaluator " + evalId;
+      LOG.log(Level.WARNING, msg, ex);
+      throw new RuntimeException(msg, ex);
+    }
+  }
+
+  @Override
+  public String getComputeContextIdPrefix() {
+    return COMPUTE_CONTEXT_PREFIX;
+  }
+
+  @Override
+  public boolean isComputeContext(final ActiveContext context) {
+    return context.getId().startsWith(COMPUTE_CONTEXT_PREFIX);
+  }
+
+  @Override
+  public boolean isDataLoadedContext(final ActiveContext context) {
+    return context.getId().startsWith(DATA_LOAD_CONTEXT_PREFIX);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputSplitExternalConstructor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputSplitExternalConstructor.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputSplitExternalConstructor.java
new file mode 100644
index 0000000..cedad2d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputSplitExternalConstructor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * A Tang external constructor to inject an InputSplit
+ * by deserializing the serialized input split assigned
+ * to this evaluator
+ */
+@TaskSide
+public class InputSplitExternalConstructor implements ExternalConstructor<InputSplit> {
+
+  private final InputSplit inputSplit;
+
+  @Inject
+  public InputSplitExternalConstructor(
+      final JobConf jobConf,
+      @Parameter(SerializedInputSplit.class) final String serializedInputSplit) {
+    this.inputSplit = WritableSerializer.deserialize(serializedInputSplit, jobConf);
+  }
+
+  @Override
+  public InputSplit newInstance() {
+    return inputSplit;
+  }
+
+  @NamedParameter
+  public static final class SerializedInputSplit implements Name<String> {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/JobConfExternalConstructor.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/JobConfExternalConstructor.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/JobConfExternalConstructor.java
new file mode 100644
index 0000000..67be9a7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/JobConfExternalConstructor.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class JobConfExternalConstructor implements ExternalConstructor<JobConf> {
+
+  private static final Logger LOG = Logger.getLogger(JobConfExternalConstructor.class.getName());
+
+  private final String inputFormatClassName;
+  private final String inputPath;
+
+  @Inject
+  public JobConfExternalConstructor(
+      final @Parameter(InputFormatClass.class) String inputFormatClassName,
+      final @Parameter(InputPath.class) String inputPath) {
+    this.inputFormatClassName = inputFormatClassName;
+    this.inputPath = inputPath;
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Override
+  public JobConf newInstance() {
+
+    final JobConf jobConf = new JobConf();
+
+    try {
+
+      final Class<? extends InputFormat> inputFormatClass =
+          (Class<? extends InputFormat>) Class.forName(this.inputFormatClassName);
+
+      jobConf.setInputFormat(inputFormatClass);
+
+      final Method addInputPath =
+          inputFormatClass.getMethod("addInputPath", JobConf.class, Path.class);
+
+      addInputPath.invoke(inputFormatClass, jobConf, new Path(this.inputPath));
+
+    } catch (final ClassNotFoundException ex) {
+      throw new RuntimeException("InputFormat: " + this.inputFormatClassName
+          + " ClassNotFoundException while creating newInstance of JobConf", ex);
+    } catch (final InvocationTargetException | IllegalAccessException ex) {
+      throw new RuntimeException("InputFormat: " + this.inputFormatClassName
+          + ".addInputPath() method exists, but cannot be called.", ex);
+    } catch (final NoSuchMethodException ex) {
+      LOG.log(Level.INFO, "{0}.addInputPath() method does not exist", this.inputFormatClassName);
+    }
+
+    return jobConf;
+  }
+
+  @NamedParameter()
+  public static final class InputFormatClass implements Name<String> {
+  }
+
+  @NamedParameter(default_value = "NULL")
+  public static final class InputPath implements Name<String> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
new file mode 100644
index 0000000..6cef3b7
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputSplit;
+
+/**
+ * A tuple of an object of type E and an integer index
+ * Used inside {@link EvaluatorToPartitionMapper} to
+ * mark the partitions associated with each {@link InputSplit}
+ *
+ * @param <E>
+ */
+final class NumberedSplit<E> implements Comparable<NumberedSplit<E>> {
+  private final E entry;
+  private final int index;
+
+  public NumberedSplit(final E entry, final int index) {
+    super();
+    if (entry == null) {
+      throw new IllegalArgumentException("Entry cannot be null");
+    }
+    this.entry = entry;
+    this.index = index;
+  }
+
+  public E getEntry() {
+    return entry;
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+  @Override
+  public String toString() {
+    return "InputSplit-" + index;
+  }
+
+  @Override
+  public int compareTo(final NumberedSplit<E> o) {
+    if (this.index == o.index)
+      return 0;
+    if (this.index < o.index)
+      return -1;
+    else
+      return 1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/WritableSerializer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/WritableSerializer.java
new file mode 100644
index 0000000..80264b1
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/WritableSerializer.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.reef.io.serialization.Codec;
+
+import java.io.*;
+
+/**
+ * A serializer class that serializes {@link Writable}s
+ * into String using the below {@link Codec} that
+ * encodes & decodes {@link Writable}s
+ * By default this stores the class name in the serialized
+ * form so that the specific type can be instantiated on
+ * de-serialization. However, this also needs the jobconf
+ * to passed in while de-serialization
+ */
+public class WritableSerializer {
+  public static <E extends Writable> String serialize(E writable) {
+    final WritableCodec<E> writableCodec = new WritableCodec<>();
+    return Base64.encodeBase64String(writableCodec.encode(writable));
+  }
+
+  public static <E extends Writable> E deserialize(String serializedWritable) {
+    final WritableCodec<E> writableCodec = new WritableCodec<>();
+    return writableCodec.decode(Base64.decodeBase64(serializedWritable));
+  }
+
+  public static <E extends Writable> E deserialize(String serializedWritable, JobConf jobConf) {
+    final WritableCodec<E> writableCodec = new WritableCodec<>(jobConf);
+    return writableCodec.decode(Base64.decodeBase64(serializedWritable));
+  }
+
+  static class WritableCodec<E extends Writable> implements Codec<E> {
+    private final JobConf jobConf;
+
+    public WritableCodec(JobConf jobConf) {
+      this.jobConf = jobConf;
+    }
+
+    public WritableCodec() {
+      this.jobConf = new JobConf();
+    }
+
+    @Override
+    public E decode(byte[] bytes) {
+      final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+      try (DataInputStream dais = new DataInputStream(bais)) {
+        final String className = dais.readUTF();
+        E writable = (E) ReflectionUtils.newInstance(Class.forName(className), jobConf);
+        writable.readFields(dais);
+        return writable;
+      } catch (IOException e) {
+        throw new RuntimeException("Could not de-serialize JobConf", e);
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException("Could not instantiate specific writable class", e);
+      }
+    }
+
+    @Override
+    public byte[] encode(E writable) {
+      final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      try (final DataOutputStream daos = new DataOutputStream(baos)) {
+        daos.writeUTF(writable.getClass().getName());
+        writable.write(daos);
+        return baos.toByteArray();
+      } catch (IOException e) {
+        throw new RuntimeException("Could not serialize JobConf", e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java
new file mode 100644
index 0000000..718a576
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Cache.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Cache for network and naming services
+ */
+public interface Cache<K, V> {
+  /**
+   *  Constructs with timeout
+   *  key is evicted when it's not used for timeout milli-seconds
+   */
+
+  /**
+   * Returns a value for the key if cached; otherwise creates, caches and returns
+   * When it creates a value for a key, only one callable for the key is executed
+   *
+   * @param key      a key
+   * @param callable a value fetcher
+   * @return a value
+   * @throws NetworkException
+   */
+  public V get(K key, Callable<V> valueFetcher) throws ExecutionException;
+
+  /**
+   * Invalidates a key from the cache
+   *
+   * @param key a key
+   */
+  public void invalidate(K key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
new file mode 100644
index 0000000..42d05f9
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Connection.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+
+/**
+ * Connection between two end-points named by identifiers.
+ *
+ * @param <T> type
+ */
+public interface Connection<T> extends AutoCloseable {
+
+  /**
+   * Opens the connection.
+   *
+   * @throws NetworkException
+   */
+  void open() throws NetworkException;
+
+  /**
+   * Writes an object to the connection.
+   *
+   * @param obj
+   * @throws NetworkException
+   */
+  void write(T obj) throws NetworkException;
+
+  /**
+   * Closes the connection.
+   *
+   * @throws NetworkException
+   */
+  @Override
+  void close() throws NetworkException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/ConnectionFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/ConnectionFactory.java
new file mode 100644
index 0000000..40b28b0
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/ConnectionFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.wake.Identifier;
+
+/**
+ * Factory that creates a new connection
+ *
+ * @param <T> type
+ */
+public interface ConnectionFactory<T> {
+
+  /**
+   * Creates a new connection
+   *
+   * @param destId a destination identifier
+   * @return a connection
+   */
+  public Connection<T> newConnection(Identifier destId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Message.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Message.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Message.java
new file mode 100644
index 0000000..1aaa7e0
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/Message.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.wake.Identifier;
+
+/**
+ * Network message
+ *
+ * @param <T>
+ */
+public interface Message<T> {
+
+  /**
+   * Gets a source identifier
+   *
+   * @return an identifier
+   */
+  Identifier getSrcId();
+
+  /**
+   * Gets a destination identifier
+   *
+   * @return an identifier
+   */
+  Identifier getDestId();
+
+  /**
+   * Gets data
+   *
+   * @return an iterable of data objects
+   */
+  Iterable<T> getData();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java
new file mode 100644
index 0000000..1443a46
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/TransportFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+
+/**
+ * Factory that creates a transport
+ */
+public interface TransportFactory {
+
+  /**
+   * Creates a transport
+   *
+   * @param port          a listening port
+   * @param clientHandler a transport client-side handler
+   * @param serverHandler a transport server-side handler
+   * @param exHandler     an exception handler
+   * @return
+   */
+  public Transport create(int port,
+                          EventHandler<TransportEvent> clientHandler,
+                          EventHandler<TransportEvent> serverHandler,
+                          EventHandler<Exception> exHandler);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/NetworkRuntimeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/NetworkRuntimeException.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/NetworkRuntimeException.java
new file mode 100644
index 0000000..4424e8d
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/NetworkRuntimeException.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.exception;
+
+/**
+ * Network service resourcemanager exception
+ */
+public class NetworkRuntimeException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Constructs a new network resourcemanager exception with the specified detail message and cause
+   *
+   * @param s the detailed message
+   * @param e the cause
+   */
+  public NetworkRuntimeException(String s, Throwable e) {
+    super(s, e);
+  }
+
+  /**
+   * Constructs a new network resourcemanager exception with the specified detail message
+   *
+   * @param s the detailed message
+   */
+  public NetworkRuntimeException(String s) {
+    super(s);
+  }
+
+  /**
+   * Constructs a new network resourcemanager exception with the specified cause
+   *
+   * @param e the cause
+   */
+  public NetworkRuntimeException(Throwable e) {
+    super(e);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
new file mode 100644
index 0000000..6e406d3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/ParentDeadException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.exception;
+
+/**
+ * This exception is thrown by a child task
+ * when it is told that its Parent died
+ */
+public class ParentDeadException extends Exception {
+
+  private static final long serialVersionUID = 7636542209271151579L;
+
+  public ParentDeadException() {
+  }
+
+  public ParentDeadException(final String message) {
+    super(message);
+  }
+
+  public ParentDeadException(final Throwable cause) {
+    super(cause);
+  }
+
+  public ParentDeadException(final String message, final Throwable cause) {
+    super(message, cause);
+  }
+
+  public ParentDeadException(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/package-info.java
new file mode 100644
index 0000000..df0d392
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/exception/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.exception;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNSToTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNSToTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNSToTask.java
new file mode 100644
index 0000000..e8c7842
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNSToTask.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.IdentifierFactory;
+
+import javax.inject.Inject;
+
+public class BindNSToTask implements EventHandler<TaskStart> {
+
+  private final NetworkService<?> ns;
+  private final IdentifierFactory idFac;
+
+  @Inject
+  public BindNSToTask(
+      final NetworkService<?> ns,
+      final @Parameter(NetworkServiceParameters.NetworkServiceIdentifierFactory.class) IdentifierFactory idFac) {
+    this.ns = ns;
+    this.idFac = idFac;
+  }
+
+  @Override
+  public void onNext(final TaskStart task) {
+    this.ns.registerId(this.idFac.getNewInstance(task.getId()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
new file mode 100644
index 0000000..e169624
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.TransportFactory;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.NetUtils;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
+
+import javax.inject.Inject;
+
+/**
+ * Factory that creates a messaging transport
+ */
+public class MessagingTransportFactory implements TransportFactory {
+
+  @Inject
+  public MessagingTransportFactory() {
+  }
+
+  /**
+   * Creates a transport
+   *
+   * @param port          a listening port
+   * @param clientHandler a transport client side handler
+   * @param serverHandler a transport server side handler
+   * @param exHandler     a exception handler
+   */
+  @Override
+  public Transport create(final int port,
+                          final EventHandler<TransportEvent> clientHandler,
+                          final EventHandler<TransportEvent> serverHandler,
+                          final EventHandler<Exception> exHandler) {
+
+    final Transport transport = new NettyMessagingTransport(NetUtils.getLocalAddress(),
+        port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 3, 10000);
+
+    transport.registerErrorHandler(exHandler);
+    return transport;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
new file mode 100644
index 0000000..a4b4538
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A connection from the network service
+ */
+class NSConnection<T> implements Connection<T> {
+
+  private static final Logger LOG = Logger.getLogger(NSConnection.class.getName());
+
+  private final Identifier srcId;
+  private final Identifier destId;
+  private final LinkListener<NSMessage<T>> listener;
+  private final NetworkService<T> service;
+  private final Codec<NSMessage<T>> codec;
+
+  // link can change when an endpoint physical address changes
+  private Link<NSMessage<T>> link;
+
+  /**
+   * Constructs a connection
+   *
+   * @param srcId    a source identifier
+   * @param destId   a destination identifier
+   * @param listener a link listener
+   * @param service  a network service
+   */
+  NSConnection(final Identifier srcId, final Identifier destId,
+               final LinkListener<T> listener, final NetworkService<T> service) {
+    this.srcId = srcId;
+    this.destId = destId;
+    this.listener = new NSMessageLinkListener<>(listener);
+    this.service = service;
+    this.codec = new NSMessageCodec<>(service.getCodec(), service.getIdentifierFactory());
+  }
+
+  /**
+   * Opens the connection.
+   */
+  @Override
+  public void open() throws NetworkException {
+
+    LOG.log(Level.FINE, "looking up {0}", this.destId);
+
+    try {
+      // naming lookup
+      final InetSocketAddress addr = this.service.getNameClient().lookup(this.destId);
+      if (addr == null) {
+        final NetworkException ex = new NamingException("Cannot resolve " + this.destId);
+        LOG.log(Level.WARNING, "Cannot resolve " + this.destId, ex);
+        throw ex;
+      }
+
+      LOG.log(Level.FINE, "Resolved {0} to {1}", new Object[]{this.destId, addr});
+
+      // connect to a remote address
+      this.link = this.service.getTransport().open(addr, this.codec, this.listener);
+      LOG.log(Level.FINE, "Transport returned a link {0}", this.link);
+
+    } catch (final Exception ex) {
+      LOG.log(Level.WARNING, "Could not open " + this.destId, ex);
+      throw new NetworkException(ex);
+    }
+  }
+
+  /**
+   * Writes an object to the connection
+   *
+   * @param obj an object of type T
+   * @throws a network exception
+   */
+  @Override
+  public void write(final T obj) throws NetworkException {
+    try {
+      this.link.write(new NSMessage<T>(this.srcId, this.destId, obj));
+    } catch (final IOException ex) {
+      LOG.log(Level.WARNING, "Could not write to " + this.destId, ex);
+      throw new NetworkException(ex);
+    }
+  }
+
+  /**
+   * Closes the connection and unregisters it from the service
+   */
+  @Override
+  public void close() throws NetworkException {
+    this.service.remove(this.destId);
+  }
+}
+
+/**
+ * No-op link listener
+ *
+ * @param <T>
+ */
+final class NSMessageLinkListener<T> implements LinkListener<NSMessage<T>> {
+
+  NSMessageLinkListener(final LinkListener<T> listener) {
+  }
+
+  @Override
+  public void messageReceived(final NSMessage<T> message) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessage.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessage.java
new file mode 100644
index 0000000..063b9e3
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSMessage.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.impl;
+
+import org.apache.reef.io.network.Message;
+import org.apache.reef.wake.Identifier;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Network service message that implements the Message interface
+ *
+ * @param <T> type
+ */
+public class NSMessage<T> implements Message<T> {
+  private final Identifier srcId;
+  private final Identifier destId;
+  private final List<T> data;
+
+  /**
+   * Constructs a network service message
+   *
+   * @param srcId  a source identifier
+   * @param destId a destination identifier
+   * @param data   data of type T
+   */
+  public NSMessage(final Identifier srcId, final Identifier destId, final T data) {
+    this.srcId = srcId;
+    this.destId = destId;
+    this.data = new ArrayList<T>(1);
+    this.data.add(data);
+  }
+
+  /**
+   * Constructs a network service message
+   *
+   * @param srcId  a source identifier
+   * @param destId a destination identifier
+   * @param data   a list of data of type T
+   */
+  public NSMessage(final Identifier srcId, final Identifier destId, final List<T> data) {
+    this.srcId = srcId;
+    this.destId = destId;
+    this.data = data;
+  }
+
+  /**
+   * Gets a source identifier
+   *
+   * @return an identifier
+   */
+  public Identifier getSrcId() {
+    return srcId;
+  }
+
+  /**
+   * Gets a destination identifier
+   *
+   * @return an identifier
+   */
+  public Identifier getDestId() {
+    return destId;
+  }
+
+  /**
+   * Gets data
+   *
+   * @return data
+   */
+  public List<T> getData() {
+    return data;
+  }
+}